Skip to main content

arrow_select/
take.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Defines take kernel for [Array]
19
20use std::fmt::Display;
21use std::mem::ManuallyDrop;
22use std::sync::Arc;
23
24use arrow_array::builder::{BufferBuilder, UInt32Builder};
25use arrow_array::cast::AsArray;
26use arrow_array::types::*;
27use arrow_array::*;
28use arrow_buffer::{
29    ArrowNativeType, BooleanBuffer, Buffer, MutableBuffer, NullBuffer, OffsetBuffer, ScalarBuffer,
30    bit_util,
31};
32use arrow_data::{ArrayDataBuilder, transform::MutableArrayData};
33use arrow_schema::{ArrowError, DataType, FieldRef, UnionMode};
34
35use num_traits::Zero;
36
37/// Take elements by index from [Array], creating a new [Array] from those indexes.
38///
39/// ```text
40/// ┌─────────────────┐      ┌─────────┐                              ┌─────────────────┐
41/// │        A        │      │    0    │                              │        A        │
42/// ├─────────────────┤      ├─────────┤                              ├─────────────────┤
43/// │        D        │      │    2    │                              │        B        │
44/// ├─────────────────┤      ├─────────┤   take(values, indices)      ├─────────────────┤
45/// │        B        │      │    3    │ ─────────────────────────▶   │        C        │
46/// ├─────────────────┤      ├─────────┤                              ├─────────────────┤
47/// │        C        │      │    1    │                              │        D        │
48/// ├─────────────────┤      └─────────┘                              └─────────────────┘
49/// │        E        │
50/// └─────────────────┘
51///    values array          indices array                              result
52/// ```
53///
54/// For selecting values by index from multiple arrays see [`crate::interleave`]
55///
56/// Note that this kernel, similar to other kernels in this crate,
57/// will avoid allocating where not necessary. Consequently
58/// the returned array may share buffers with the inputs
59///
60/// # Errors
61/// This function errors whenever:
62/// * An index cannot be casted to `usize` (typically 32 bit architectures)
63/// * An index is out of bounds and `options` is set to check bounds.
64///
65/// # Safety
66///
67/// When `options` is not set to check bounds, taking indexes after `len` will panic.
68///
69/// # See also
70/// * [`BatchCoalescer`]: to filter multiple [`RecordBatch`] and coalesce
71///   the results into a single array.
72///
73/// [`BatchCoalescer`]: crate::coalesce::BatchCoalescer
74///
75/// # Examples
76/// ```
77/// # use arrow_array::{StringArray, UInt32Array, cast::AsArray};
78/// # use arrow_select::take::take;
79/// let values = StringArray::from(vec!["zero", "one", "two"]);
80///
81/// // Take items at index 2, and 1:
82/// let indices = UInt32Array::from(vec![2, 1]);
83/// let taken = take(&values, &indices, None).unwrap();
84/// let taken = taken.as_string::<i32>();
85///
86/// assert_eq!(*taken, StringArray::from(vec!["two", "one"]));
87/// ```
88pub fn take(
89    values: &dyn Array,
90    indices: &dyn Array,
91    options: Option<TakeOptions>,
92) -> Result<ArrayRef, ArrowError> {
93    let options = options.unwrap_or_default();
94    downcast_integer_array!(
95        indices => {
96            if options.check_bounds {
97                check_bounds(values.len(), indices)?;
98            }
99            let indices = indices.to_indices();
100            take_impl(values, &indices)
101        },
102        d => Err(ArrowError::InvalidArgumentError(format!("Take only supported for integers, got {d:?}")))
103    )
104}
105
106/// For each [ArrayRef] in the [`Vec<ArrayRef>`], take elements by index and create a new
107/// [`Vec<ArrayRef>`] from those indices.
108///
109/// ```text
110/// ┌────────┬────────┐
111/// │        │        │           ┌────────┐                                ┌────────┬────────┐
112/// │   A    │   1    │           │        │                                │        │        │
113/// ├────────┼────────┤           │   0    │                                │   A    │   1    │
114/// │        │        │           ├────────┤                                ├────────┼────────┤
115/// │   D    │   4    │           │        │                                │        │        │
116/// ├────────┼────────┤           │   2    │  take_arrays(values,indices)   │   B    │   2    │
117/// │        │        │           ├────────┤                                ├────────┼────────┤
118/// │   B    │   2    │           │        │  ───────────────────────────►  │        │        │
119/// ├────────┼────────┤           │   3    │                                │   C    │   3    │
120/// │        │        │           ├────────┤                                ├────────┼────────┤
121/// │   C    │   3    │           │        │                                │        │        │
122/// ├────────┼────────┤           │   1    │                                │   D    │   4    │
123/// │        │        │           └────────┘                                └────────┼────────┘
124/// │   E    │   5    │
125/// └────────┴────────┘
126///    values arrays             indices array                                      result
127/// ```
128///
129/// # Errors
130/// This function errors whenever:
131/// * An index cannot be casted to `usize` (typically 32 bit architectures)
132/// * An index is out of bounds and `options` is set to check bounds.
133///
134/// # Safety
135///
136/// When `options` is not set to check bounds, taking indexes after `len` will panic.
137///
138/// # Examples
139/// ```
140/// # use std::sync::Arc;
141/// # use arrow_array::{StringArray, UInt32Array, cast::AsArray};
142/// # use arrow_select::take::{take, take_arrays};
143/// let string_values = Arc::new(StringArray::from(vec!["zero", "one", "two"]));
144/// let values = Arc::new(UInt32Array::from(vec![0, 1, 2]));
145///
146/// // Take items at index 2, and 1:
147/// let indices = UInt32Array::from(vec![2, 1]);
148/// let taken_arrays = take_arrays(&[string_values, values], &indices, None).unwrap();
149/// let taken_string = taken_arrays[0].as_string::<i32>();
150/// assert_eq!(*taken_string, StringArray::from(vec!["two", "one"]));
151/// let taken_values = taken_arrays[1].as_primitive();
152/// assert_eq!(*taken_values, UInt32Array::from(vec![2, 1]));
153/// ```
154pub fn take_arrays(
155    arrays: &[ArrayRef],
156    indices: &dyn Array,
157    options: Option<TakeOptions>,
158) -> Result<Vec<ArrayRef>, ArrowError> {
159    arrays
160        .iter()
161        .map(|array| take(array.as_ref(), indices, options.clone()))
162        .collect()
163}
164
165/// Verifies that the non-null values of `indices` are all `< len`
166fn check_bounds<T: ArrowPrimitiveType>(
167    len: usize,
168    indices: &PrimitiveArray<T>,
169) -> Result<(), ArrowError>
170where
171    T::Native: Display,
172{
173    let len = match T::Native::from_usize(len) {
174        Some(len) => len,
175        None => {
176            if T::DATA_TYPE.is_integer() {
177                // the biggest representable value for T::Native is lower than len, e.g: u8::MAX < 512, no need to check bounds
178                return Ok(());
179            } else {
180                return Err(ArrowError::ComputeError("Cast to usize failed".to_string()));
181            }
182        }
183    };
184
185    if indices.null_count() > 0 {
186        indices.iter().flatten().try_for_each(|index| {
187            if index >= len {
188                return Err(ArrowError::ComputeError(format!(
189                    "Array index out of bounds, cannot get item at index {index} from {len} entries"
190                )));
191            }
192            Ok(())
193        })
194    } else {
195        let in_bounds = indices.values().iter().fold(true, |in_bounds, &i| {
196            in_bounds & (i >= T::Native::ZERO) & (i < len)
197        });
198
199        if !in_bounds {
200            for &index in indices.values() {
201                if index < T::Native::ZERO || index >= len {
202                    return Err(ArrowError::ComputeError(format!(
203                        "Array index out of bounds, cannot get item at index {index} from {len} entries"
204                    )));
205                }
206            }
207        }
208
209        Ok(())
210    }
211}
212
213#[inline(never)]
214fn take_impl<IndexType: ArrowPrimitiveType>(
215    values: &dyn Array,
216    indices: &PrimitiveArray<IndexType>,
217) -> Result<ArrayRef, ArrowError> {
218    if indices.is_empty() {
219        return Ok(new_empty_array(values.data_type()));
220    }
221    downcast_primitive_array! {
222        values => Ok(Arc::new(take_primitive(values, indices)?)),
223        DataType::Boolean => {
224            let values = values.as_any().downcast_ref::<BooleanArray>().unwrap();
225            Ok(Arc::new(take_boolean(values, indices)))
226        }
227        DataType::Utf8 => {
228            Ok(Arc::new(take_bytes(values.as_string::<i32>(), indices)?))
229        }
230        DataType::LargeUtf8 => {
231            Ok(Arc::new(take_bytes(values.as_string::<i64>(), indices)?))
232        }
233        DataType::Utf8View => {
234            Ok(Arc::new(take_byte_view(values.as_string_view(), indices)?))
235        }
236        DataType::List(_) => {
237            Ok(Arc::new(take_list::<_, Int32Type>(values.as_list(), indices)?))
238        }
239        DataType::LargeList(_) => {
240            Ok(Arc::new(take_list::<_, Int64Type>(values.as_list(), indices)?))
241        }
242        DataType::ListView(_) => {
243            Ok(Arc::new(take_list_view::<_, Int32Type>(values.as_list_view(), indices)?))
244        }
245        DataType::LargeListView(_) => {
246            Ok(Arc::new(take_list_view::<_, Int64Type>(values.as_list_view(), indices)?))
247        }
248        DataType::FixedSizeList(_, length) => {
249            let values = values
250                .as_any()
251                .downcast_ref::<FixedSizeListArray>()
252                .unwrap();
253            Ok(Arc::new(take_fixed_size_list(
254                values,
255                indices,
256                *length as u32,
257            )?))
258        }
259        DataType::Map(_, _) => {
260            let list_arr = ListArray::from(values.as_map().clone());
261            let list_data = take_list::<_, Int32Type>(&list_arr, indices)?;
262            let builder = list_data.into_data().into_builder().data_type(values.data_type().clone());
263            Ok(Arc::new(MapArray::from(unsafe { builder.build_unchecked() })))
264        }
265        DataType::Struct(fields) => {
266            let array: &StructArray = values.as_struct();
267            let arrays  = array
268                .columns()
269                .iter()
270                .map(|a| take_impl(a.as_ref(), indices))
271                .collect::<Result<Vec<ArrayRef>, _>>()?;
272            let fields: Vec<(FieldRef, ArrayRef)> =
273                fields.iter().cloned().zip(arrays).collect();
274
275            // Create the null bit buffer.
276            let is_valid: Buffer = indices
277                .iter()
278                .map(|index| {
279                    if let Some(index) = index {
280                        array.is_valid(index.to_usize().unwrap())
281                    } else {
282                        false
283                    }
284                })
285                .collect();
286
287            if fields.is_empty() {
288                let nulls = NullBuffer::new(BooleanBuffer::new(is_valid, 0, indices.len()));
289                Ok(Arc::new(StructArray::new_empty_fields(indices.len(), Some(nulls))))
290            } else {
291                Ok(Arc::new(StructArray::from((fields, is_valid))) as ArrayRef)
292            }
293        }
294        DataType::Dictionary(_, _) => downcast_dictionary_array! {
295            values => Ok(Arc::new(take_dict(values, indices)?)),
296            t => unimplemented!("Take not supported for dictionary type {:?}", t)
297        }
298        DataType::RunEndEncoded(_, _) => downcast_run_array! {
299            values => Ok(Arc::new(take_run(values, indices)?)),
300            t => unimplemented!("Take not supported for run type {:?}", t)
301        }
302        DataType::Binary => {
303            Ok(Arc::new(take_bytes(values.as_binary::<i32>(), indices)?))
304        }
305        DataType::LargeBinary => {
306            Ok(Arc::new(take_bytes(values.as_binary::<i64>(), indices)?))
307        }
308        DataType::BinaryView => {
309            Ok(Arc::new(take_byte_view(values.as_binary_view(), indices)?))
310        }
311        DataType::FixedSizeBinary(size) => {
312            let values = values
313                .as_any()
314                .downcast_ref::<FixedSizeBinaryArray>()
315                .unwrap();
316            Ok(Arc::new(take_fixed_size_binary(values, indices, *size)?))
317        }
318        DataType::Null => {
319            // Take applied to a null array produces a null array.
320            if values.len() >= indices.len() {
321                // If the existing null array is as big as the indices, we can use a slice of it
322                // to avoid allocating a new null array.
323                Ok(values.slice(0, indices.len()))
324            } else {
325                // If the existing null array isn't big enough, create a new one.
326                Ok(new_null_array(&DataType::Null, indices.len()))
327            }
328        }
329        DataType::Union(fields, UnionMode::Sparse) => {
330            let mut children = Vec::with_capacity(fields.len());
331            let values = values.as_any().downcast_ref::<UnionArray>().unwrap();
332            let type_ids = take_native(values.type_ids(), indices);
333            for (type_id, _field) in fields.iter() {
334                let values = values.child(type_id);
335                let values = take_impl(values, indices)?;
336                children.push(values);
337            }
338            let array = UnionArray::try_new(fields.clone(), type_ids, None, children)?;
339            Ok(Arc::new(array))
340        }
341        DataType::Union(fields, UnionMode::Dense) => {
342            let values = values.as_any().downcast_ref::<UnionArray>().unwrap();
343
344            let type_ids = <PrimitiveArray<Int8Type>>::try_new(take_native(values.type_ids(), indices), None)?;
345            let offsets = <PrimitiveArray<Int32Type>>::try_new(take_native(values.offsets().unwrap(), indices), None)?;
346
347            let children = fields.iter()
348                .map(|(field_type_id, _)| {
349                    let mask = BooleanArray::from_unary(&type_ids, |value_type_id| value_type_id == field_type_id);
350
351                    let indices = crate::filter::filter(&offsets, &mask)?;
352
353                    let values = values.child(field_type_id);
354
355                    take_impl(values, indices.as_primitive::<Int32Type>())
356                })
357                .collect::<Result<_, _>>()?;
358
359            let mut child_offsets = [0; 128];
360
361            let offsets = type_ids.values()
362                .iter()
363                .map(|&i| {
364                    let offset = child_offsets[i as usize];
365
366                    child_offsets[i as usize] += 1;
367
368                    offset
369                })
370                .collect();
371
372            let (_, type_ids, _) = type_ids.into_parts();
373
374            let array = UnionArray::try_new(fields.clone(), type_ids, Some(offsets), children)?;
375
376            Ok(Arc::new(array))
377        }
378        t => unimplemented!("Take not supported for data type {:?}", t)
379    }
380}
381
382/// Options that define how `take` should behave
383#[derive(Clone, Debug, Default)]
384pub struct TakeOptions {
385    /// Perform bounds check before taking indices from values.
386    /// If enabled, an `ArrowError` is returned if the indices are out of bounds.
387    /// If not enabled, and indices exceed bounds, the kernel will panic.
388    pub check_bounds: bool,
389}
390
391/// `take` implementation for all primitive arrays
392///
393/// This checks if an `indices` slot is populated, and gets the value from `values`
394///  as the populated index.
395/// If the `indices` slot is null, a null value is returned.
396/// For example, given:
397///     values:  [1, 2, 3, null, 5]
398///     indices: [0, null, 4, 3]
399/// The result is: [1 (slot 0), null (null slot), 5 (slot 4), null (slot 3)]
400fn take_primitive<T, I>(
401    values: &PrimitiveArray<T>,
402    indices: &PrimitiveArray<I>,
403) -> Result<PrimitiveArray<T>, ArrowError>
404where
405    T: ArrowPrimitiveType,
406    I: ArrowPrimitiveType,
407{
408    let values_buf = take_native(values.values(), indices);
409    let nulls = take_nulls(values.nulls(), indices);
410    Ok(PrimitiveArray::try_new(values_buf, nulls)?.with_data_type(values.data_type().clone()))
411}
412
413#[inline(never)]
414fn take_nulls<I: ArrowPrimitiveType>(
415    values: Option<&NullBuffer>,
416    indices: &PrimitiveArray<I>,
417) -> Option<NullBuffer> {
418    match values.filter(|n| n.null_count() > 0) {
419        Some(n) => NullBuffer::from_unsliced_buffer(
420            take_bits(n.inner(), indices).into_inner(),
421            indices.len(),
422        ),
423        None => indices.nulls().cloned(),
424    }
425}
426
427#[inline(never)]
428fn take_native<T: ArrowNativeType, I: ArrowPrimitiveType>(
429    values: &[T],
430    indices: &PrimitiveArray<I>,
431) -> ScalarBuffer<T> {
432    match indices.nulls().filter(|n| n.null_count() > 0) {
433        Some(n) => indices
434            .values()
435            .iter()
436            .enumerate()
437            .map(|(idx, index)| match values.get(index.as_usize()) {
438                Some(v) => *v,
439                // SAFETY: idx<indices.len()
440                None => match unsafe { n.inner().value_unchecked(idx) } {
441                    false => T::default(),
442                    true => panic!("Out-of-bounds index {index:?}"),
443                },
444            })
445            .collect(),
446        None => indices
447            .values()
448            .iter()
449            .map(|index| values[index.as_usize()])
450            .collect(),
451    }
452}
453
454#[inline(never)]
455fn take_bits<I: ArrowPrimitiveType>(
456    values: &BooleanBuffer,
457    indices: &PrimitiveArray<I>,
458) -> BooleanBuffer {
459    let len = indices.len();
460
461    match indices.nulls().filter(|n| n.null_count() > 0) {
462        Some(nulls) => {
463            let mut output_buffer = MutableBuffer::new_null(len);
464            let output_slice = output_buffer.as_slice_mut();
465            nulls.valid_indices().for_each(|idx| {
466                // SAFETY: idx is a valid index in indices.nulls() --> idx<indices.len()
467                if values.value(unsafe { indices.value_unchecked(idx).as_usize() }) {
468                    // SAFETY: MutableBuffer was created with space for indices.len() bit, and idx < indices.len()
469                    unsafe { bit_util::set_bit_raw(output_slice.as_mut_ptr(), idx) };
470                }
471            });
472            BooleanBuffer::new(output_buffer.into(), 0, len)
473        }
474        None => {
475            BooleanBuffer::collect_bool(len, |idx: usize| {
476                // SAFETY: idx<indices.len()
477                values.value(unsafe { indices.value_unchecked(idx).as_usize() })
478            })
479        }
480    }
481}
482
483/// `take` implementation for boolean arrays
484fn take_boolean<IndexType: ArrowPrimitiveType>(
485    values: &BooleanArray,
486    indices: &PrimitiveArray<IndexType>,
487) -> BooleanArray {
488    let val_buf = take_bits(values.values(), indices);
489    let null_buf = take_nulls(values.nulls(), indices);
490    BooleanArray::new(val_buf, null_buf)
491}
492
493/// `take` implementation for string arrays
494fn take_bytes<T: ByteArrayType, IndexType: ArrowPrimitiveType>(
495    array: &GenericByteArray<T>,
496    indices: &PrimitiveArray<IndexType>,
497) -> Result<GenericByteArray<T>, ArrowError> {
498    let mut offsets = Vec::with_capacity(indices.len() + 1);
499    offsets.push(T::Offset::default());
500
501    let input_offsets = array.value_offsets();
502    let mut capacity = 0;
503    let nulls = take_nulls(array.nulls(), indices);
504
505    let (offsets, values) = if array.null_count() == 0 && indices.null_count() == 0 {
506        offsets.reserve(indices.len());
507        for index in indices.values() {
508            let index = index.as_usize();
509            capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
510            offsets.push(
511                T::Offset::from_usize(capacity)
512                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
513            );
514        }
515        let mut values = Vec::with_capacity(capacity);
516
517        for index in indices.values() {
518            values.extend_from_slice(array.value(index.as_usize()).as_ref());
519        }
520        (offsets, values)
521    } else if indices.null_count() == 0 {
522        offsets.reserve(indices.len());
523        for index in indices.values() {
524            let index = index.as_usize();
525            if array.is_valid(index) {
526                capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
527            }
528            offsets.push(
529                T::Offset::from_usize(capacity)
530                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
531            );
532        }
533        let mut values = Vec::with_capacity(capacity);
534
535        for index in indices.values() {
536            let index = index.as_usize();
537            if array.is_valid(index) {
538                values.extend_from_slice(array.value(index).as_ref());
539            }
540        }
541        (offsets, values)
542    } else if array.null_count() == 0 {
543        offsets.reserve(indices.len());
544        for (i, index) in indices.values().iter().enumerate() {
545            let index = index.as_usize();
546            if indices.is_valid(i) {
547                capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
548            }
549            offsets.push(
550                T::Offset::from_usize(capacity)
551                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
552            );
553        }
554        let mut values = Vec::with_capacity(capacity);
555
556        for (i, index) in indices.values().iter().enumerate() {
557            if indices.is_valid(i) {
558                values.extend_from_slice(array.value(index.as_usize()).as_ref());
559            }
560        }
561        (offsets, values)
562    } else {
563        let nulls = nulls.as_ref().unwrap();
564        offsets.reserve(indices.len());
565        for (i, index) in indices.values().iter().enumerate() {
566            let index = index.as_usize();
567            if nulls.is_valid(i) {
568                capacity += input_offsets[index + 1].as_usize() - input_offsets[index].as_usize();
569            }
570            offsets.push(
571                T::Offset::from_usize(capacity)
572                    .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
573            );
574        }
575        let mut values = Vec::with_capacity(capacity);
576
577        for (i, index) in indices.values().iter().enumerate() {
578            // check index is valid before using index. The value in
579            // NULL index slots may not be within bounds of array
580            let index = index.as_usize();
581            if nulls.is_valid(i) {
582                values.extend_from_slice(array.value(index).as_ref());
583            }
584        }
585        (offsets, values)
586    };
587
588    T::Offset::from_usize(values.len())
589        .ok_or_else(|| ArrowError::OffsetOverflowError(values.len()))?;
590
591    let array = unsafe {
592        let offsets = OffsetBuffer::new_unchecked(offsets.into());
593        GenericByteArray::<T>::new_unchecked(offsets, values.into(), nulls)
594    };
595
596    Ok(array)
597}
598
599/// `take` implementation for byte view arrays
600fn take_byte_view<T: ByteViewType, IndexType: ArrowPrimitiveType>(
601    array: &GenericByteViewArray<T>,
602    indices: &PrimitiveArray<IndexType>,
603) -> Result<GenericByteViewArray<T>, ArrowError> {
604    let new_views = take_native(array.views(), indices);
605    let new_nulls = take_nulls(array.nulls(), indices);
606    // Safety:  array.views was valid, and take_native copies only valid values, and verifies bounds
607    Ok(unsafe {
608        GenericByteViewArray::new_unchecked(new_views, array.data_buffers().to_vec(), new_nulls)
609    })
610}
611
612/// `take` implementation for list arrays
613///
614/// Copies the selected list entries' child slices into a new child array
615/// via `MutableArrayData`, then reconstructs a list array with new offsets
616fn take_list<IndexType, OffsetType>(
617    values: &GenericListArray<OffsetType::Native>,
618    indices: &PrimitiveArray<IndexType>,
619) -> Result<GenericListArray<OffsetType::Native>, ArrowError>
620where
621    IndexType: ArrowPrimitiveType,
622    OffsetType: ArrowPrimitiveType,
623    OffsetType::Native: OffsetSizeTrait,
624    PrimitiveArray<OffsetType>: From<Vec<OffsetType::Native>>,
625{
626    let list_offsets = values.value_offsets();
627    let child_data = values.values().to_data();
628    let nulls = take_nulls(values.nulls(), indices);
629
630    let mut new_offsets = Vec::with_capacity(indices.len() + 1);
631    new_offsets.push(OffsetType::Native::zero());
632
633    let use_nulls = child_data.null_count() > 0;
634
635    let capacity = child_data
636        .len()
637        .checked_div(values.len())
638        .map(|v| v * indices.len())
639        .unwrap_or_default();
640
641    let mut array_data = MutableArrayData::new(vec![&child_data], use_nulls, capacity);
642
643    match nulls.as_ref().filter(|n| n.null_count() > 0) {
644        None => {
645            for index in indices.values() {
646                let ix = index.as_usize();
647                let start = list_offsets[ix].as_usize();
648                let end = list_offsets[ix + 1].as_usize();
649                array_data.extend(0, start, end);
650                new_offsets.push(OffsetType::Native::from_usize(array_data.len()).unwrap());
651            }
652        }
653        Some(output_nulls) => {
654            assert_eq!(output_nulls.len(), indices.len());
655
656            let mut last_filled = 0;
657            for i in output_nulls.valid_indices() {
658                let current = OffsetType::Native::from_usize(array_data.len()).unwrap();
659                // Filling offsets for the null values between the two valid indices
660                if last_filled < i {
661                    new_offsets.extend(std::iter::repeat_n(current, i - last_filled));
662                }
663
664                // SAFETY: `i` comes from validity bitmap over `indices`, so in-bounds.
665                let ix = unsafe { indices.value_unchecked(i) }.as_usize();
666                let start = list_offsets[ix].as_usize();
667                let end = list_offsets[ix + 1].as_usize();
668                array_data.extend(0, start, end);
669                new_offsets.push(OffsetType::Native::from_usize(array_data.len()).unwrap());
670                last_filled = i + 1;
671            }
672
673            // Filling offsets for null values at the end
674            let final_offset = OffsetType::Native::from_usize(array_data.len()).unwrap();
675            new_offsets.extend(std::iter::repeat_n(
676                final_offset,
677                indices.len() - last_filled,
678            ));
679        }
680    };
681
682    assert_eq!(
683        new_offsets.len(),
684        indices.len() + 1,
685        "New offsets was filled under/over the expected capacity"
686    );
687
688    let child_data = array_data.freeze();
689    let value_offsets = Buffer::from_vec(new_offsets);
690
691    let list_data = ArrayDataBuilder::new(values.data_type().clone())
692        .len(indices.len())
693        .nulls(nulls)
694        .offset(0)
695        .add_child_data(child_data)
696        .add_buffer(value_offsets);
697
698    let list_data = unsafe { list_data.build_unchecked() };
699    Ok(GenericListArray::<OffsetType::Native>::from(list_data))
700}
701
702fn take_list_view<IndexType, OffsetType>(
703    values: &GenericListViewArray<OffsetType::Native>,
704    indices: &PrimitiveArray<IndexType>,
705) -> Result<GenericListViewArray<OffsetType::Native>, ArrowError>
706where
707    IndexType: ArrowPrimitiveType,
708    OffsetType: ArrowPrimitiveType,
709    OffsetType::Native: OffsetSizeTrait,
710{
711    let taken_offsets = take_native(values.offsets(), indices);
712    let taken_sizes = take_native(values.sizes(), indices);
713    let nulls = take_nulls(values.nulls(), indices);
714
715    let list_view_data = ArrayDataBuilder::new(values.data_type().clone())
716        .len(indices.len())
717        .nulls(nulls)
718        .buffers(vec![taken_offsets.into(), taken_sizes.into()])
719        .child_data(vec![values.values().to_data()]);
720
721    // SAFETY: all buffers and child nodes for ListView added in constructor
722    let list_view_data = unsafe { list_view_data.build_unchecked() };
723
724    Ok(GenericListViewArray::<OffsetType::Native>::from(
725        list_view_data,
726    ))
727}
728
729/// `take` implementation for `FixedSizeListArray`
730///
731/// Calculates the index and indexed offset for the inner array,
732/// applying `take` on the inner array, then reconstructing a list array
733/// with the indexed offsets
734fn take_fixed_size_list<IndexType: ArrowPrimitiveType>(
735    values: &FixedSizeListArray,
736    indices: &PrimitiveArray<IndexType>,
737    length: <UInt32Type as ArrowPrimitiveType>::Native,
738) -> Result<FixedSizeListArray, ArrowError> {
739    let list_indices = take_value_indices_from_fixed_size_list(values, indices, length)?;
740    let taken = take_impl::<UInt32Type>(values.values().as_ref(), &list_indices)?;
741
742    // determine null count and null buffer, which are a function of `values` and `indices`
743    let num_bytes = bit_util::ceil(indices.len(), 8);
744    let mut null_buf = MutableBuffer::new(num_bytes).with_bitset(num_bytes, true);
745    let null_slice = null_buf.as_slice_mut();
746
747    for i in 0..indices.len() {
748        let index = indices
749            .value(i)
750            .to_usize()
751            .ok_or_else(|| ArrowError::ComputeError("Cast to usize failed".to_string()))?;
752        if !indices.is_valid(i) || values.is_null(index) {
753            bit_util::unset_bit(null_slice, i);
754        }
755    }
756
757    let list_data = ArrayDataBuilder::new(values.data_type().clone())
758        .len(indices.len())
759        .null_bit_buffer(Some(null_buf.into()))
760        .offset(0)
761        .add_child_data(taken.into_data());
762
763    let list_data = unsafe { list_data.build_unchecked() };
764
765    Ok(FixedSizeListArray::from(list_data))
766}
767
768/// The take kernel implementation for `FixedSizeBinaryArray`.
769///
770/// The computation is done in two steps:
771/// - Compute the values buffer
772/// - Compute the null buffer
773fn take_fixed_size_binary<IndexType: ArrowPrimitiveType>(
774    values: &FixedSizeBinaryArray,
775    indices: &PrimitiveArray<IndexType>,
776    size: i32,
777) -> Result<FixedSizeBinaryArray, ArrowError> {
778    let size_usize = usize::try_from(size).map_err(|_| {
779        ArrowError::InvalidArgumentError(format!("Cannot convert size '{}' to usize", size))
780    })?;
781
782    let result_buffer = match size_usize {
783        1 => take_fixed_size::<IndexType, 1>(values.values(), indices),
784        2 => take_fixed_size::<IndexType, 2>(values.values(), indices),
785        4 => take_fixed_size::<IndexType, 4>(values.values(), indices),
786        8 => take_fixed_size::<IndexType, 8>(values.values(), indices),
787        16 => take_fixed_size::<IndexType, 16>(values.values(), indices),
788        _ => take_fixed_size_binary_buffer_dynamic_length(values, indices, size_usize),
789    };
790
791    let value_nulls = take_nulls(values.nulls(), indices);
792    let final_nulls = NullBuffer::union(value_nulls.as_ref(), indices.nulls());
793    let array_data = ArrayDataBuilder::new(DataType::FixedSizeBinary(size))
794        .len(indices.len())
795        .nulls(final_nulls)
796        .offset(0)
797        .add_buffer(result_buffer)
798        .build()?;
799
800    return Ok(FixedSizeBinaryArray::from(array_data));
801
802    /// Implementation of the take kernel for fixed size binary arrays.
803    #[inline(never)]
804    fn take_fixed_size_binary_buffer_dynamic_length<IndexType: ArrowPrimitiveType>(
805        values: &FixedSizeBinaryArray,
806        indices: &PrimitiveArray<IndexType>,
807        size_usize: usize,
808    ) -> Buffer {
809        let values_buffer = values.values().as_slice();
810        let mut values_buffer_builder = BufferBuilder::new(indices.len() * size_usize);
811
812        if indices.null_count() == 0 {
813            let array_iter = indices.values().iter().map(|idx| {
814                let offset = idx.as_usize() * size_usize;
815                &values_buffer[offset..offset + size_usize]
816            });
817            for slice in array_iter {
818                values_buffer_builder.append_slice(slice);
819            }
820        } else {
821            // The indices nullability cannot be ignored here because the values buffer may contain
822            // nulls which should not cause a panic.
823            let array_iter = indices.iter().map(|idx| {
824                idx.map(|idx| {
825                    let offset = idx.as_usize() * size_usize;
826                    &values_buffer[offset..offset + size_usize]
827                })
828            });
829            for slice in array_iter {
830                match slice {
831                    None => values_buffer_builder.append_n(size_usize, 0),
832                    Some(slice) => values_buffer_builder.append_slice(slice),
833                }
834            }
835        }
836
837        values_buffer_builder.finish()
838    }
839}
840
841/// Implements the take kernel semantics over a flat [`Buffer`], interpreting it as a slice of
842/// `&[[u8; N]]`, where `N` is a compile-time constant. The usage of a flat [`Buffer`] allows using
843/// this kernel without an available [`ArrowPrimitiveType`] (e.g., for `[u8; 5]`).
844///
845/// # Using This Function in the Primitive Take Kernel
846///
847/// This function is basically the same as [`take_native`] but just on a flat [`Buffer`] instead of
848/// the primitive [`ScalarBuffer`]. Ideally, the [`take_primitive`] kernel should just use this
849/// more general function. However, the "idiomatic code" requires the
850/// [feature(generic_const_exprs)](https://github.com/rust-lang/rust/issues/76560) for calling
851/// `take_fixed_size<I, { size_of::<T::Native> () } >(...)`. Once this feature has been stabilized,
852/// we can use this function also in the primitive kernels.
853fn take_fixed_size<IndexType: ArrowPrimitiveType, const N: usize>(
854    buffer: &Buffer,
855    indices: &PrimitiveArray<IndexType>,
856) -> Buffer {
857    assert_eq!(
858        buffer.len() % N,
859        0,
860        "Invalid array length in take_fixed_size"
861    );
862
863    let ptr = buffer.as_ptr();
864    let chunk_ptr = ptr.cast::<[u8; N]>();
865    let chunk_len = buffer.len() / N;
866    let buffer: &[[u8; N]] = unsafe {
867        // SAFETY: interpret an already valid slice as a slice of N-byte chunks. N divides buffer
868        // length without remainder.
869        std::slice::from_raw_parts(chunk_ptr, chunk_len)
870    };
871
872    let result_buffer = match indices.nulls().filter(|n| n.null_count() > 0) {
873        Some(n) => indices
874            .values()
875            .iter()
876            .enumerate()
877            .map(|(idx, index)| match buffer.get(index.as_usize()) {
878                Some(v) => *v,
879                // SAFETY: idx<indices.len()
880                None => match unsafe { n.inner().value_unchecked(idx) } {
881                    false => [0u8; N],
882                    true => panic!("Out-of-bounds index {index:?}"),
883                },
884            })
885            .collect::<Vec<_>>(),
886        None => indices
887            .values()
888            .iter()
889            .map(|index| buffer[index.as_usize()])
890            .collect::<Vec<_>>(),
891    };
892
893    let mut vec = ManuallyDrop::new(result_buffer); // Prevent de-allocation
894    let ptr = vec.as_mut_ptr();
895    let len = vec.len();
896    let cap = vec.capacity();
897    let result_buffer = unsafe {
898        // SAFETY: flattening an already valid Vec.
899        Vec::from_raw_parts(ptr.cast::<u8>(), len * N, cap * N)
900    };
901
902    Buffer::from_vec(result_buffer)
903}
904
905/// `take` implementation for dictionary arrays
906///
907/// applies `take` to the keys of the dictionary array and returns a new dictionary array
908/// with the same dictionary values and reordered keys
909fn take_dict<T: ArrowDictionaryKeyType, I: ArrowPrimitiveType>(
910    values: &DictionaryArray<T>,
911    indices: &PrimitiveArray<I>,
912) -> Result<DictionaryArray<T>, ArrowError> {
913    let new_keys = take_primitive(values.keys(), indices)?;
914    Ok(unsafe { DictionaryArray::new_unchecked(new_keys, values.values().clone()) })
915}
916
917/// `take` implementation for run arrays
918///
919/// Finds physical indices for the given logical indices and builds output run array
920/// by taking values in the input run_array.values at the physical indices.
921/// The output run array will be run encoded on the physical indices and not on output values.
922/// For e.g. an input `RunArray{ run_ends = [2,4,6,8], values=[1,2,1,2] }` and `logical_indices=[2,3,6,7]`
923/// would be converted to `physical_indices=[1,1,3,3]` which will be used to build
924/// output `RunArray{ run_ends=[2,4], values=[2,2] }`.
925fn take_run<T: RunEndIndexType, I: ArrowPrimitiveType>(
926    run_array: &RunArray<T>,
927    logical_indices: &PrimitiveArray<I>,
928) -> Result<RunArray<T>, ArrowError> {
929    // get physical indices for the input logical indices
930    let physical_indices = run_array.get_physical_indices(logical_indices.values())?;
931
932    // Run encode the physical indices into new_run_ends_builder
933    // Keep track of the physical indices to take in take_value_indices
934    // `unwrap` is used in this function because the unwrapped values are bounded by the corresponding `::Native`.
935    let mut new_run_ends_builder = BufferBuilder::<T::Native>::new(1);
936    let mut take_value_indices = BufferBuilder::<I::Native>::new(1);
937    let mut new_physical_len = 1;
938    for ix in 1..physical_indices.len() {
939        if physical_indices[ix] != physical_indices[ix - 1] {
940            take_value_indices.append(I::Native::from_usize(physical_indices[ix - 1]).unwrap());
941            new_run_ends_builder.append(T::Native::from_usize(ix).unwrap());
942            new_physical_len += 1;
943        }
944    }
945    take_value_indices
946        .append(I::Native::from_usize(physical_indices[physical_indices.len() - 1]).unwrap());
947    new_run_ends_builder.append(T::Native::from_usize(physical_indices.len()).unwrap());
948    let new_run_ends = unsafe {
949        // Safety:
950        // The function builds a valid run_ends array and hence need not be validated.
951        ArrayDataBuilder::new(T::DATA_TYPE)
952            .len(new_physical_len)
953            .null_count(0)
954            .add_buffer(new_run_ends_builder.finish())
955            .build_unchecked()
956    };
957
958    let take_value_indices: PrimitiveArray<I> = unsafe {
959        // Safety:
960        // The function builds a valid take_value_indices array and hence need not be validated.
961        ArrayDataBuilder::new(I::DATA_TYPE)
962            .len(new_physical_len)
963            .null_count(0)
964            .add_buffer(take_value_indices.finish())
965            .build_unchecked()
966            .into()
967    };
968
969    let new_values = take(run_array.values(), &take_value_indices, None)?;
970
971    let builder = ArrayDataBuilder::new(run_array.data_type().clone())
972        .len(physical_indices.len())
973        .add_child_data(new_run_ends)
974        .add_child_data(new_values.into_data());
975    let array_data = unsafe {
976        // Safety:
977        //  This function builds a valid run array and hence can skip validation.
978        builder.build_unchecked()
979    };
980    Ok(array_data.into())
981}
982
983/// Takes/filters a fixed size list array's inner data using the offsets of the list array.
984fn take_value_indices_from_fixed_size_list<IndexType>(
985    list: &FixedSizeListArray,
986    indices: &PrimitiveArray<IndexType>,
987    length: <UInt32Type as ArrowPrimitiveType>::Native,
988) -> Result<PrimitiveArray<UInt32Type>, ArrowError>
989where
990    IndexType: ArrowPrimitiveType,
991{
992    let mut values = UInt32Builder::with_capacity(length as usize * indices.len());
993
994    for i in 0..indices.len() {
995        if indices.is_valid(i) {
996            let index = indices
997                .value(i)
998                .to_usize()
999                .ok_or_else(|| ArrowError::ComputeError("Cast to usize failed".to_string()))?;
1000            let start = list.value_offset(index) as <UInt32Type as ArrowPrimitiveType>::Native;
1001
1002            // Safety: Range always has known length.
1003            unsafe {
1004                values.append_trusted_len_iter(start..start + length);
1005            }
1006        } else {
1007            values.append_nulls(length as usize);
1008        }
1009    }
1010
1011    Ok(values.finish())
1012}
1013
1014/// To avoid generating take implementations for every index type, instead we
1015/// only generate for UInt32 and UInt64 and coerce inputs to these types
1016trait ToIndices {
1017    type T: ArrowPrimitiveType;
1018
1019    fn to_indices(&self) -> PrimitiveArray<Self::T>;
1020}
1021
1022macro_rules! to_indices_reinterpret {
1023    ($t:ty, $o:ty) => {
1024        impl ToIndices for PrimitiveArray<$t> {
1025            type T = $o;
1026
1027            fn to_indices(&self) -> PrimitiveArray<$o> {
1028                let cast = ScalarBuffer::new(self.values().inner().clone(), 0, self.len());
1029                PrimitiveArray::new(cast, self.nulls().cloned())
1030            }
1031        }
1032    };
1033}
1034
1035macro_rules! to_indices_identity {
1036    ($t:ty) => {
1037        impl ToIndices for PrimitiveArray<$t> {
1038            type T = $t;
1039
1040            fn to_indices(&self) -> PrimitiveArray<$t> {
1041                self.clone()
1042            }
1043        }
1044    };
1045}
1046
1047macro_rules! to_indices_widening {
1048    ($t:ty, $o:ty) => {
1049        impl ToIndices for PrimitiveArray<$t> {
1050            type T = UInt32Type;
1051
1052            fn to_indices(&self) -> PrimitiveArray<$o> {
1053                let cast = self.values().iter().copied().map(|x| x as _).collect();
1054                PrimitiveArray::new(cast, self.nulls().cloned())
1055            }
1056        }
1057    };
1058}
1059
1060to_indices_widening!(UInt8Type, UInt32Type);
1061to_indices_widening!(Int8Type, UInt32Type);
1062
1063to_indices_widening!(UInt16Type, UInt32Type);
1064to_indices_widening!(Int16Type, UInt32Type);
1065
1066to_indices_identity!(UInt32Type);
1067to_indices_reinterpret!(Int32Type, UInt32Type);
1068
1069to_indices_identity!(UInt64Type);
1070to_indices_reinterpret!(Int64Type, UInt64Type);
1071
1072/// Take rows by index from [`RecordBatch`] and returns a new [`RecordBatch`] from those indexes.
1073///
1074/// This function will call [`take`] on each array of the [`RecordBatch`] and assemble a new [`RecordBatch`].
1075///
1076/// # Example
1077/// ```
1078/// # use std::sync::Arc;
1079/// # use arrow_array::{StringArray, Int32Array, UInt32Array, RecordBatch};
1080/// # use arrow_schema::{DataType, Field, Schema};
1081/// # use arrow_select::take::take_record_batch;
1082/// let schema = Arc::new(Schema::new(vec![
1083///     Field::new("a", DataType::Int32, true),
1084///     Field::new("b", DataType::Utf8, true),
1085/// ]));
1086/// let batch = RecordBatch::try_new(
1087///     schema.clone(),
1088///     vec![
1089///         Arc::new(Int32Array::from_iter_values(0..20)),
1090///         Arc::new(StringArray::from_iter_values(
1091///             (0..20).map(|i| format!("str-{}", i)),
1092///         )),
1093///     ],
1094/// )
1095/// .unwrap();
1096///
1097/// let indices = UInt32Array::from(vec![1, 5, 10]);
1098/// let taken = take_record_batch(&batch, &indices).unwrap();
1099///
1100/// let expected = RecordBatch::try_new(
1101///     schema,
1102///     vec![
1103///         Arc::new(Int32Array::from(vec![1, 5, 10])),
1104///         Arc::new(StringArray::from(vec!["str-1", "str-5", "str-10"])),
1105///     ],
1106/// )
1107/// .unwrap();
1108/// assert_eq!(taken, expected);
1109/// ```
1110pub fn take_record_batch(
1111    record_batch: &RecordBatch,
1112    indices: &dyn Array,
1113) -> Result<RecordBatch, ArrowError> {
1114    let columns = record_batch
1115        .columns()
1116        .iter()
1117        .map(|c| take(c, indices, None))
1118        .collect::<Result<Vec<_>, _>>()?;
1119    RecordBatch::try_new(record_batch.schema(), columns)
1120}
1121
1122#[cfg(test)]
1123mod tests {
1124    use super::*;
1125    use arrow_array::builder::*;
1126    use arrow_buffer::{IntervalDayTime, IntervalMonthDayNano};
1127    use arrow_data::ArrayData;
1128    use arrow_schema::{Field, Fields, TimeUnit, UnionFields};
1129    use num_traits::ToPrimitive;
1130
1131    fn test_take_decimal_arrays(
1132        data: Vec<Option<i128>>,
1133        index: &UInt32Array,
1134        options: Option<TakeOptions>,
1135        expected_data: Vec<Option<i128>>,
1136        precision: &u8,
1137        scale: &i8,
1138    ) -> Result<(), ArrowError> {
1139        let output = data
1140            .into_iter()
1141            .collect::<Decimal128Array>()
1142            .with_precision_and_scale(*precision, *scale)
1143            .unwrap();
1144
1145        let expected = expected_data
1146            .into_iter()
1147            .collect::<Decimal128Array>()
1148            .with_precision_and_scale(*precision, *scale)
1149            .unwrap();
1150
1151        let expected = Arc::new(expected) as ArrayRef;
1152        let output = take(&output, index, options).unwrap();
1153        assert_eq!(&output, &expected);
1154        Ok(())
1155    }
1156
1157    fn test_take_boolean_arrays(
1158        data: Vec<Option<bool>>,
1159        index: &UInt32Array,
1160        options: Option<TakeOptions>,
1161        expected_data: Vec<Option<bool>>,
1162    ) {
1163        let output = BooleanArray::from(data);
1164        let expected = Arc::new(BooleanArray::from(expected_data)) as ArrayRef;
1165        let output = take(&output, index, options).unwrap();
1166        assert_eq!(&output, &expected)
1167    }
1168
1169    fn test_take_primitive_arrays<T>(
1170        data: Vec<Option<T::Native>>,
1171        index: &UInt32Array,
1172        options: Option<TakeOptions>,
1173        expected_data: Vec<Option<T::Native>>,
1174    ) -> Result<(), ArrowError>
1175    where
1176        T: ArrowPrimitiveType,
1177        PrimitiveArray<T>: From<Vec<Option<T::Native>>>,
1178    {
1179        let output = PrimitiveArray::<T>::from(data);
1180        let expected = Arc::new(PrimitiveArray::<T>::from(expected_data)) as ArrayRef;
1181        let output = take(&output, index, options)?;
1182        assert_eq!(&output, &expected);
1183        Ok(())
1184    }
1185
1186    fn test_take_primitive_arrays_non_null<T>(
1187        data: Vec<T::Native>,
1188        index: &UInt32Array,
1189        options: Option<TakeOptions>,
1190        expected_data: Vec<Option<T::Native>>,
1191    ) -> Result<(), ArrowError>
1192    where
1193        T: ArrowPrimitiveType,
1194        PrimitiveArray<T>: From<Vec<T::Native>>,
1195        PrimitiveArray<T>: From<Vec<Option<T::Native>>>,
1196    {
1197        let output = PrimitiveArray::<T>::from(data);
1198        let expected = Arc::new(PrimitiveArray::<T>::from(expected_data)) as ArrayRef;
1199        let output = take(&output, index, options)?;
1200        assert_eq!(&output, &expected);
1201        Ok(())
1202    }
1203
1204    fn test_take_impl_primitive_arrays<T, I>(
1205        data: Vec<Option<T::Native>>,
1206        index: &PrimitiveArray<I>,
1207        options: Option<TakeOptions>,
1208        expected_data: Vec<Option<T::Native>>,
1209    ) where
1210        T: ArrowPrimitiveType,
1211        PrimitiveArray<T>: From<Vec<Option<T::Native>>>,
1212        I: ArrowPrimitiveType,
1213    {
1214        let output = PrimitiveArray::<T>::from(data);
1215        let expected = PrimitiveArray::<T>::from(expected_data);
1216        let output = take(&output, index, options).unwrap();
1217        let output = output.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
1218        assert_eq!(output, &expected)
1219    }
1220
1221    // create a simple struct for testing purposes
1222    fn create_test_struct(values: Vec<Option<(Option<bool>, Option<i32>)>>) -> StructArray {
1223        let mut struct_builder = StructBuilder::new(
1224            Fields::from(vec![
1225                Field::new("a", DataType::Boolean, true),
1226                Field::new("b", DataType::Int32, true),
1227            ]),
1228            vec![
1229                Box::new(BooleanBuilder::with_capacity(values.len())),
1230                Box::new(Int32Builder::with_capacity(values.len())),
1231            ],
1232        );
1233
1234        for value in values {
1235            struct_builder
1236                .field_builder::<BooleanBuilder>(0)
1237                .unwrap()
1238                .append_option(value.and_then(|v| v.0));
1239            struct_builder
1240                .field_builder::<Int32Builder>(1)
1241                .unwrap()
1242                .append_option(value.and_then(|v| v.1));
1243            struct_builder.append(value.is_some());
1244        }
1245        struct_builder.finish()
1246    }
1247
1248    #[test]
1249    fn test_take_decimal128_non_null_indices() {
1250        let index = UInt32Array::from(vec![0, 5, 3, 1, 4, 2]);
1251        let precision: u8 = 10;
1252        let scale: i8 = 5;
1253        test_take_decimal_arrays(
1254            vec![None, Some(3), Some(5), Some(2), Some(3), None],
1255            &index,
1256            None,
1257            vec![None, None, Some(2), Some(3), Some(3), Some(5)],
1258            &precision,
1259            &scale,
1260        )
1261        .unwrap();
1262    }
1263
1264    #[test]
1265    fn test_take_decimal128() {
1266        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(2)]);
1267        let precision: u8 = 10;
1268        let scale: i8 = 5;
1269        test_take_decimal_arrays(
1270            vec![Some(0), Some(1), Some(2), Some(3), Some(4)],
1271            &index,
1272            None,
1273            vec![Some(3), None, Some(1), Some(3), Some(2)],
1274            &precision,
1275            &scale,
1276        )
1277        .unwrap();
1278    }
1279
1280    #[test]
1281    fn test_take_primitive_non_null_indices() {
1282        let index = UInt32Array::from(vec![0, 5, 3, 1, 4, 2]);
1283        test_take_primitive_arrays::<Int8Type>(
1284            vec![None, Some(3), Some(5), Some(2), Some(3), None],
1285            &index,
1286            None,
1287            vec![None, None, Some(2), Some(3), Some(3), Some(5)],
1288        )
1289        .unwrap();
1290    }
1291
1292    #[test]
1293    fn test_take_primitive_non_null_values() {
1294        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(2)]);
1295        test_take_primitive_arrays::<Int8Type>(
1296            vec![Some(0), Some(1), Some(2), Some(3), Some(4)],
1297            &index,
1298            None,
1299            vec![Some(3), None, Some(1), Some(3), Some(2)],
1300        )
1301        .unwrap();
1302    }
1303
1304    #[test]
1305    fn test_take_primitive_non_null() {
1306        let index = UInt32Array::from(vec![0, 5, 3, 1, 4, 2]);
1307        test_take_primitive_arrays::<Int8Type>(
1308            vec![Some(0), Some(3), Some(5), Some(2), Some(3), Some(1)],
1309            &index,
1310            None,
1311            vec![Some(0), Some(1), Some(2), Some(3), Some(3), Some(5)],
1312        )
1313        .unwrap();
1314    }
1315
1316    #[test]
1317    fn test_take_primitive_nullable_indices_non_null_values_with_offset() {
1318        let index = UInt32Array::from(vec![Some(0), Some(1), Some(2), Some(3), None, None]);
1319        let index = index.slice(2, 4);
1320        let index = index.as_any().downcast_ref::<UInt32Array>().unwrap();
1321
1322        assert_eq!(
1323            index,
1324            &UInt32Array::from(vec![Some(2), Some(3), None, None])
1325        );
1326
1327        test_take_primitive_arrays_non_null::<Int64Type>(
1328            vec![0, 10, 20, 30, 40, 50],
1329            index,
1330            None,
1331            vec![Some(20), Some(30), None, None],
1332        )
1333        .unwrap();
1334    }
1335
1336    #[test]
1337    fn test_take_primitive_nullable_indices_nullable_values_with_offset() {
1338        let index = UInt32Array::from(vec![Some(0), Some(1), Some(2), Some(3), None, None]);
1339        let index = index.slice(2, 4);
1340        let index = index.as_any().downcast_ref::<UInt32Array>().unwrap();
1341
1342        assert_eq!(
1343            index,
1344            &UInt32Array::from(vec![Some(2), Some(3), None, None])
1345        );
1346
1347        test_take_primitive_arrays::<Int64Type>(
1348            vec![None, None, Some(20), Some(30), Some(40), Some(50)],
1349            index,
1350            None,
1351            vec![Some(20), Some(30), None, None],
1352        )
1353        .unwrap();
1354    }
1355
1356    #[test]
1357    fn test_take_primitive() {
1358        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(2)]);
1359
1360        // int8
1361        test_take_primitive_arrays::<Int8Type>(
1362            vec![Some(0), None, Some(2), Some(3), None],
1363            &index,
1364            None,
1365            vec![Some(3), None, None, Some(3), Some(2)],
1366        )
1367        .unwrap();
1368
1369        // int16
1370        test_take_primitive_arrays::<Int16Type>(
1371            vec![Some(0), None, Some(2), Some(3), None],
1372            &index,
1373            None,
1374            vec![Some(3), None, None, Some(3), Some(2)],
1375        )
1376        .unwrap();
1377
1378        // int32
1379        test_take_primitive_arrays::<Int32Type>(
1380            vec![Some(0), None, Some(2), Some(3), None],
1381            &index,
1382            None,
1383            vec![Some(3), None, None, Some(3), Some(2)],
1384        )
1385        .unwrap();
1386
1387        // int64
1388        test_take_primitive_arrays::<Int64Type>(
1389            vec![Some(0), None, Some(2), Some(3), None],
1390            &index,
1391            None,
1392            vec![Some(3), None, None, Some(3), Some(2)],
1393        )
1394        .unwrap();
1395
1396        // uint8
1397        test_take_primitive_arrays::<UInt8Type>(
1398            vec![Some(0), None, Some(2), Some(3), None],
1399            &index,
1400            None,
1401            vec![Some(3), None, None, Some(3), Some(2)],
1402        )
1403        .unwrap();
1404
1405        // uint16
1406        test_take_primitive_arrays::<UInt16Type>(
1407            vec![Some(0), None, Some(2), Some(3), None],
1408            &index,
1409            None,
1410            vec![Some(3), None, None, Some(3), Some(2)],
1411        )
1412        .unwrap();
1413
1414        // uint32
1415        test_take_primitive_arrays::<UInt32Type>(
1416            vec![Some(0), None, Some(2), Some(3), None],
1417            &index,
1418            None,
1419            vec![Some(3), None, None, Some(3), Some(2)],
1420        )
1421        .unwrap();
1422
1423        // int64
1424        test_take_primitive_arrays::<Int64Type>(
1425            vec![Some(0), None, Some(2), Some(-15), None],
1426            &index,
1427            None,
1428            vec![Some(-15), None, None, Some(-15), Some(2)],
1429        )
1430        .unwrap();
1431
1432        // interval_year_month
1433        test_take_primitive_arrays::<IntervalYearMonthType>(
1434            vec![Some(0), None, Some(2), Some(-15), None],
1435            &index,
1436            None,
1437            vec![Some(-15), None, None, Some(-15), Some(2)],
1438        )
1439        .unwrap();
1440
1441        // interval_day_time
1442        let v1 = IntervalDayTime::new(0, 0);
1443        let v2 = IntervalDayTime::new(2, 0);
1444        let v3 = IntervalDayTime::new(-15, 0);
1445        test_take_primitive_arrays::<IntervalDayTimeType>(
1446            vec![Some(v1), None, Some(v2), Some(v3), None],
1447            &index,
1448            None,
1449            vec![Some(v3), None, None, Some(v3), Some(v2)],
1450        )
1451        .unwrap();
1452
1453        // interval_month_day_nano
1454        let v1 = IntervalMonthDayNano::new(0, 0, 0);
1455        let v2 = IntervalMonthDayNano::new(2, 0, 0);
1456        let v3 = IntervalMonthDayNano::new(-15, 0, 0);
1457        test_take_primitive_arrays::<IntervalMonthDayNanoType>(
1458            vec![Some(v1), None, Some(v2), Some(v3), None],
1459            &index,
1460            None,
1461            vec![Some(v3), None, None, Some(v3), Some(v2)],
1462        )
1463        .unwrap();
1464
1465        // duration_second
1466        test_take_primitive_arrays::<DurationSecondType>(
1467            vec![Some(0), None, Some(2), Some(-15), None],
1468            &index,
1469            None,
1470            vec![Some(-15), None, None, Some(-15), Some(2)],
1471        )
1472        .unwrap();
1473
1474        // duration_millisecond
1475        test_take_primitive_arrays::<DurationMillisecondType>(
1476            vec![Some(0), None, Some(2), Some(-15), None],
1477            &index,
1478            None,
1479            vec![Some(-15), None, None, Some(-15), Some(2)],
1480        )
1481        .unwrap();
1482
1483        // duration_microsecond
1484        test_take_primitive_arrays::<DurationMicrosecondType>(
1485            vec![Some(0), None, Some(2), Some(-15), None],
1486            &index,
1487            None,
1488            vec![Some(-15), None, None, Some(-15), Some(2)],
1489        )
1490        .unwrap();
1491
1492        // duration_nanosecond
1493        test_take_primitive_arrays::<DurationNanosecondType>(
1494            vec![Some(0), None, Some(2), Some(-15), None],
1495            &index,
1496            None,
1497            vec![Some(-15), None, None, Some(-15), Some(2)],
1498        )
1499        .unwrap();
1500
1501        // float32
1502        test_take_primitive_arrays::<Float32Type>(
1503            vec![Some(0.0), None, Some(2.21), Some(-3.1), None],
1504            &index,
1505            None,
1506            vec![Some(-3.1), None, None, Some(-3.1), Some(2.21)],
1507        )
1508        .unwrap();
1509
1510        // float64
1511        test_take_primitive_arrays::<Float64Type>(
1512            vec![Some(0.0), None, Some(2.21), Some(-3.1), None],
1513            &index,
1514            None,
1515            vec![Some(-3.1), None, None, Some(-3.1), Some(2.21)],
1516        )
1517        .unwrap();
1518    }
1519
1520    #[test]
1521    fn test_take_preserve_timezone() {
1522        let index = Int64Array::from(vec![Some(0), None]);
1523
1524        let input = TimestampNanosecondArray::from(vec![
1525            1_639_715_368_000_000_000,
1526            1_639_715_368_000_000_000,
1527        ])
1528        .with_timezone("UTC".to_string());
1529        let result = take(&input, &index, None).unwrap();
1530        match result.data_type() {
1531            DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
1532                assert_eq!(tz.clone(), Some("UTC".into()))
1533            }
1534            _ => panic!(),
1535        }
1536    }
1537
1538    #[test]
1539    fn test_take_impl_primitive_with_int64_indices() {
1540        let index = Int64Array::from(vec![Some(3), None, Some(1), Some(3), Some(2)]);
1541
1542        // int16
1543        test_take_impl_primitive_arrays::<Int16Type, Int64Type>(
1544            vec![Some(0), None, Some(2), Some(3), None],
1545            &index,
1546            None,
1547            vec![Some(3), None, None, Some(3), Some(2)],
1548        );
1549
1550        // int64
1551        test_take_impl_primitive_arrays::<Int64Type, Int64Type>(
1552            vec![Some(0), None, Some(2), Some(-15), None],
1553            &index,
1554            None,
1555            vec![Some(-15), None, None, Some(-15), Some(2)],
1556        );
1557
1558        // uint64
1559        test_take_impl_primitive_arrays::<UInt64Type, Int64Type>(
1560            vec![Some(0), None, Some(2), Some(3), None],
1561            &index,
1562            None,
1563            vec![Some(3), None, None, Some(3), Some(2)],
1564        );
1565
1566        // duration_millisecond
1567        test_take_impl_primitive_arrays::<DurationMillisecondType, Int64Type>(
1568            vec![Some(0), None, Some(2), Some(-15), None],
1569            &index,
1570            None,
1571            vec![Some(-15), None, None, Some(-15), Some(2)],
1572        );
1573
1574        // float32
1575        test_take_impl_primitive_arrays::<Float32Type, Int64Type>(
1576            vec![Some(0.0), None, Some(2.21), Some(-3.1), None],
1577            &index,
1578            None,
1579            vec![Some(-3.1), None, None, Some(-3.1), Some(2.21)],
1580        );
1581    }
1582
1583    #[test]
1584    fn test_take_impl_primitive_with_uint8_indices() {
1585        let index = UInt8Array::from(vec![Some(3), None, Some(1), Some(3), Some(2)]);
1586
1587        // int16
1588        test_take_impl_primitive_arrays::<Int16Type, UInt8Type>(
1589            vec![Some(0), None, Some(2), Some(3), None],
1590            &index,
1591            None,
1592            vec![Some(3), None, None, Some(3), Some(2)],
1593        );
1594
1595        // duration_millisecond
1596        test_take_impl_primitive_arrays::<DurationMillisecondType, UInt8Type>(
1597            vec![Some(0), None, Some(2), Some(-15), None],
1598            &index,
1599            None,
1600            vec![Some(-15), None, None, Some(-15), Some(2)],
1601        );
1602
1603        // float32
1604        test_take_impl_primitive_arrays::<Float32Type, UInt8Type>(
1605            vec![Some(0.0), None, Some(2.21), Some(-3.1), None],
1606            &index,
1607            None,
1608            vec![Some(-3.1), None, None, Some(-3.1), Some(2.21)],
1609        );
1610    }
1611
1612    #[test]
1613    fn test_take_bool() {
1614        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(2)]);
1615        // boolean
1616        test_take_boolean_arrays(
1617            vec![Some(false), None, Some(true), Some(false), None],
1618            &index,
1619            None,
1620            vec![Some(false), None, None, Some(false), Some(true)],
1621        );
1622    }
1623
1624    #[test]
1625    fn test_take_bool_nullable_index() {
1626        // indices where the masked invalid elements would be out of bounds
1627        let index_data = ArrayData::try_new(
1628            DataType::UInt32,
1629            6,
1630            Some(Buffer::from_iter(vec![
1631                false, true, false, true, false, true,
1632            ])),
1633            0,
1634            vec![Buffer::from_iter(vec![99, 0, 999, 1, 9999, 2])],
1635            vec![],
1636        )
1637        .unwrap();
1638        let index = UInt32Array::from(index_data);
1639        test_take_boolean_arrays(
1640            vec![Some(true), None, Some(false)],
1641            &index,
1642            None,
1643            vec![None, Some(true), None, None, None, Some(false)],
1644        );
1645    }
1646
1647    #[test]
1648    fn test_take_bool_nullable_index_nonnull_values() {
1649        // indices where the masked invalid elements would be out of bounds
1650        let index_data = ArrayData::try_new(
1651            DataType::UInt32,
1652            6,
1653            Some(Buffer::from_iter(vec![
1654                false, true, false, true, false, true,
1655            ])),
1656            0,
1657            vec![Buffer::from_iter(vec![99, 0, 999, 1, 9999, 2])],
1658            vec![],
1659        )
1660        .unwrap();
1661        let index = UInt32Array::from(index_data);
1662        test_take_boolean_arrays(
1663            vec![Some(true), Some(true), Some(false)],
1664            &index,
1665            None,
1666            vec![None, Some(true), None, Some(true), None, Some(false)],
1667        );
1668    }
1669
1670    #[test]
1671    fn test_take_bool_with_offset() {
1672        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(2), None]);
1673        let index = index.slice(2, 4);
1674        let index = index
1675            .as_any()
1676            .downcast_ref::<PrimitiveArray<UInt32Type>>()
1677            .unwrap();
1678
1679        // boolean
1680        test_take_boolean_arrays(
1681            vec![Some(false), None, Some(true), Some(false), None],
1682            index,
1683            None,
1684            vec![None, Some(false), Some(true), None],
1685        );
1686    }
1687
1688    fn _test_take_string<'a, K>()
1689    where
1690        K: Array + PartialEq + From<Vec<Option<&'a str>>> + 'static,
1691    {
1692        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(4)]);
1693
1694        let array = K::from(vec![
1695            Some("one"),
1696            None,
1697            Some("three"),
1698            Some("four"),
1699            Some("five"),
1700        ]);
1701        let actual = take(&array, &index, None).unwrap();
1702        assert_eq!(actual.len(), index.len());
1703
1704        let actual = actual.as_any().downcast_ref::<K>().unwrap();
1705
1706        let expected = K::from(vec![Some("four"), None, None, Some("four"), Some("five")]);
1707
1708        assert_eq!(actual, &expected);
1709    }
1710
1711    #[test]
1712    fn test_take_string() {
1713        _test_take_string::<StringArray>()
1714    }
1715
1716    #[test]
1717    fn test_take_large_string() {
1718        _test_take_string::<LargeStringArray>()
1719    }
1720
1721    #[test]
1722    fn test_take_slice_string() {
1723        let strings = StringArray::from(vec![Some("hello"), None, Some("world"), None, Some("hi")]);
1724        let indices = Int32Array::from(vec![Some(0), Some(1), None, Some(0), Some(2)]);
1725        let indices_slice = indices.slice(1, 4);
1726        let expected = StringArray::from(vec![None, None, Some("hello"), Some("world")]);
1727        let result = take(&strings, &indices_slice, None).unwrap();
1728        assert_eq!(result.as_ref(), &expected);
1729    }
1730
1731    fn _test_byte_view<T>()
1732    where
1733        T: ByteViewType,
1734        str: AsRef<T::Native>,
1735        T::Native: PartialEq,
1736    {
1737        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(4), Some(2)]);
1738        let array = {
1739            // ["hello", "world", null, "large payload over 12 bytes", "lulu"]
1740            let mut builder = GenericByteViewBuilder::<T>::new();
1741            builder.append_value("hello");
1742            builder.append_value("world");
1743            builder.append_null();
1744            builder.append_value("large payload over 12 bytes");
1745            builder.append_value("lulu");
1746            builder.finish()
1747        };
1748
1749        let actual = take(&array, &index, None).unwrap();
1750
1751        assert_eq!(actual.len(), index.len());
1752
1753        let expected = {
1754            // ["large payload over 12 bytes", null, "world", "large payload over 12 bytes", "lulu", null]
1755            let mut builder = GenericByteViewBuilder::<T>::new();
1756            builder.append_value("large payload over 12 bytes");
1757            builder.append_null();
1758            builder.append_value("world");
1759            builder.append_value("large payload over 12 bytes");
1760            builder.append_value("lulu");
1761            builder.append_null();
1762            builder.finish()
1763        };
1764
1765        assert_eq!(actual.as_ref(), &expected);
1766    }
1767
1768    #[test]
1769    fn test_take_string_view() {
1770        _test_byte_view::<StringViewType>()
1771    }
1772
1773    #[test]
1774    fn test_take_binary_view() {
1775        _test_byte_view::<BinaryViewType>()
1776    }
1777
1778    macro_rules! test_take_list {
1779        ($offset_type:ty, $list_data_type:ident, $list_array_type:ident) => {{
1780            // Construct a value array, [[0,0,0], [-1,-2,-1], [], [2,3]]
1781            let value_data = Int32Array::from(vec![0, 0, 0, -1, -2, -1, 2, 3]).into_data();
1782            // Construct offsets
1783            let value_offsets: [$offset_type; 5] = [0, 3, 6, 6, 8];
1784            let value_offsets = Buffer::from_slice_ref(&value_offsets);
1785            // Construct a list array from the above two
1786            let list_data_type =
1787                DataType::$list_data_type(Arc::new(Field::new_list_field(DataType::Int32, false)));
1788            let list_data = ArrayData::builder(list_data_type.clone())
1789                .len(4)
1790                .add_buffer(value_offsets)
1791                .add_child_data(value_data)
1792                .build()
1793                .unwrap();
1794            let list_array = $list_array_type::from(list_data);
1795
1796            // index returns: [[2,3], null, [-1,-2,-1], [], [0,0,0]]
1797            let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(2), Some(0)]);
1798
1799            let a = take(&list_array, &index, None).unwrap();
1800            let a: &$list_array_type = a.as_any().downcast_ref::<$list_array_type>().unwrap();
1801
1802            // construct a value array with expected results:
1803            // [[2,3], null, [-1,-2,-1], [], [0,0,0]]
1804            let expected_data = Int32Array::from(vec![
1805                Some(2),
1806                Some(3),
1807                Some(-1),
1808                Some(-2),
1809                Some(-1),
1810                Some(0),
1811                Some(0),
1812                Some(0),
1813            ])
1814            .into_data();
1815            // construct offsets
1816            let expected_offsets: [$offset_type; 6] = [0, 2, 2, 5, 5, 8];
1817            let expected_offsets = Buffer::from_slice_ref(&expected_offsets);
1818            // construct list array from the two
1819            let expected_list_data = ArrayData::builder(list_data_type)
1820                .len(5)
1821                // null buffer remains the same as only the indices have nulls
1822                .nulls(index.nulls().cloned())
1823                .add_buffer(expected_offsets)
1824                .add_child_data(expected_data)
1825                .build()
1826                .unwrap();
1827            let expected_list_array = $list_array_type::from(expected_list_data);
1828
1829            assert_eq!(a, &expected_list_array);
1830        }};
1831    }
1832
1833    macro_rules! test_take_list_with_value_nulls {
1834        ($offset_type:ty, $list_data_type:ident, $list_array_type:ident) => {{
1835            // Construct a value array, [[0,null,0], [-1,-2,3], [null], [5,null]]
1836            let value_data = Int32Array::from(vec![
1837                Some(0),
1838                None,
1839                Some(0),
1840                Some(-1),
1841                Some(-2),
1842                Some(3),
1843                None,
1844                Some(5),
1845                None,
1846            ])
1847            .into_data();
1848            // Construct offsets
1849            let value_offsets: [$offset_type; 5] = [0, 3, 6, 7, 9];
1850            let value_offsets = Buffer::from_slice_ref(&value_offsets);
1851            // Construct a list array from the above two
1852            let list_data_type =
1853                DataType::$list_data_type(Arc::new(Field::new_list_field(DataType::Int32, true)));
1854            let list_data = ArrayData::builder(list_data_type.clone())
1855                .len(4)
1856                .add_buffer(value_offsets)
1857                .null_bit_buffer(Some(Buffer::from([0b11111111])))
1858                .add_child_data(value_data)
1859                .build()
1860                .unwrap();
1861            let list_array = $list_array_type::from(list_data);
1862
1863            // index returns: [[null], null, [-1,-2,3], [2,null], [0,null,0]]
1864            let index = UInt32Array::from(vec![Some(2), None, Some(1), Some(3), Some(0)]);
1865
1866            let a = take(&list_array, &index, None).unwrap();
1867            let a: &$list_array_type = a.as_any().downcast_ref::<$list_array_type>().unwrap();
1868
1869            // construct a value array with expected results:
1870            // [[null], null, [-1,-2,3], [5,null], [0,null,0]]
1871            let expected_data = Int32Array::from(vec![
1872                None,
1873                Some(-1),
1874                Some(-2),
1875                Some(3),
1876                Some(5),
1877                None,
1878                Some(0),
1879                None,
1880                Some(0),
1881            ])
1882            .into_data();
1883            // construct offsets
1884            let expected_offsets: [$offset_type; 6] = [0, 1, 1, 4, 6, 9];
1885            let expected_offsets = Buffer::from_slice_ref(&expected_offsets);
1886            // construct list array from the two
1887            let expected_list_data = ArrayData::builder(list_data_type)
1888                .len(5)
1889                // null buffer remains the same as only the indices have nulls
1890                .nulls(index.nulls().cloned())
1891                .add_buffer(expected_offsets)
1892                .add_child_data(expected_data)
1893                .build()
1894                .unwrap();
1895            let expected_list_array = $list_array_type::from(expected_list_data);
1896
1897            assert_eq!(a, &expected_list_array);
1898        }};
1899    }
1900
1901    macro_rules! test_take_list_with_nulls {
1902        ($offset_type:ty, $list_data_type:ident, $list_array_type:ident) => {{
1903            // Construct a value array, [[0,null,0], [-1,-2,3], null, [5,null]]
1904            let value_data = Int32Array::from(vec![
1905                Some(0),
1906                None,
1907                Some(0),
1908                Some(-1),
1909                Some(-2),
1910                Some(3),
1911                Some(5),
1912                None,
1913            ])
1914            .into_data();
1915            // Construct offsets
1916            let value_offsets: [$offset_type; 5] = [0, 3, 6, 6, 8];
1917            let value_offsets = Buffer::from_slice_ref(&value_offsets);
1918            // Construct a list array from the above two
1919            let list_data_type =
1920                DataType::$list_data_type(Arc::new(Field::new_list_field(DataType::Int32, true)));
1921            let list_data = ArrayData::builder(list_data_type.clone())
1922                .len(4)
1923                .add_buffer(value_offsets)
1924                .null_bit_buffer(Some(Buffer::from([0b11111011])))
1925                .add_child_data(value_data)
1926                .build()
1927                .unwrap();
1928            let list_array = $list_array_type::from(list_data);
1929
1930            // index returns: [null, null, [-1,-2,3], [5,null], [0,null,0]]
1931            let index = UInt32Array::from(vec![Some(2), None, Some(1), Some(3), Some(0)]);
1932
1933            let a = take(&list_array, &index, None).unwrap();
1934            let a: &$list_array_type = a.as_any().downcast_ref::<$list_array_type>().unwrap();
1935
1936            // construct a value array with expected results:
1937            // [null, null, [-1,-2,3], [5,null], [0,null,0]]
1938            let expected_data = Int32Array::from(vec![
1939                Some(-1),
1940                Some(-2),
1941                Some(3),
1942                Some(5),
1943                None,
1944                Some(0),
1945                None,
1946                Some(0),
1947            ])
1948            .into_data();
1949            // construct offsets
1950            let expected_offsets: [$offset_type; 6] = [0, 0, 0, 3, 5, 8];
1951            let expected_offsets = Buffer::from_slice_ref(&expected_offsets);
1952            // construct list array from the two
1953            let mut null_bits: [u8; 1] = [0; 1];
1954            bit_util::set_bit(&mut null_bits, 2);
1955            bit_util::set_bit(&mut null_bits, 3);
1956            bit_util::set_bit(&mut null_bits, 4);
1957            let expected_list_data = ArrayData::builder(list_data_type)
1958                .len(5)
1959                // null buffer must be recalculated as both values and indices have nulls
1960                .null_bit_buffer(Some(Buffer::from(null_bits)))
1961                .add_buffer(expected_offsets)
1962                .add_child_data(expected_data)
1963                .build()
1964                .unwrap();
1965            let expected_list_array = $list_array_type::from(expected_list_data);
1966
1967            assert_eq!(a, &expected_list_array);
1968        }};
1969    }
1970
1971    fn test_take_list_view_generic<OffsetType: OffsetSizeTrait, ValuesType: ArrowPrimitiveType, F>(
1972        values: Vec<Option<Vec<Option<ValuesType::Native>>>>,
1973        take_indices: Vec<Option<usize>>,
1974        expected: Vec<Option<Vec<Option<ValuesType::Native>>>>,
1975        mapper: F,
1976    ) where
1977        F: Fn(GenericListViewArray<OffsetType>) -> GenericListViewArray<OffsetType>,
1978    {
1979        let mut list_view_array =
1980            GenericListViewBuilder::<OffsetType, _>::new(PrimitiveBuilder::<ValuesType>::new());
1981
1982        for value in values {
1983            list_view_array.append_option(value);
1984        }
1985        let list_view_array = list_view_array.finish();
1986        let list_view_array = mapper(list_view_array);
1987
1988        let mut indices = UInt64Builder::new();
1989        for idx in take_indices {
1990            indices.append_option(idx.map(|i| i.to_u64().unwrap()));
1991        }
1992        let indices = indices.finish();
1993
1994        let taken = take(&list_view_array, &indices, None)
1995            .unwrap()
1996            .as_list_view()
1997            .clone();
1998
1999        let mut expected_array =
2000            GenericListViewBuilder::<OffsetType, _>::new(PrimitiveBuilder::<ValuesType>::new());
2001        for value in expected {
2002            expected_array.append_option(value);
2003        }
2004        let expected_array = expected_array.finish();
2005
2006        assert_eq!(taken, expected_array);
2007    }
2008
2009    macro_rules! list_view_test_case {
2010        (values: $values:expr, indices: $indices:expr, expected: $expected: expr) => {{
2011            test_take_list_view_generic::<i32, Int8Type, _>($values, $indices, $expected, |x| x);
2012            test_take_list_view_generic::<i64, Int8Type, _>($values, $indices, $expected, |x| x);
2013        }};
2014        (values: $values:expr, transform: $fn:expr, indices: $indices:expr, expected: $expected: expr) => {{
2015            test_take_list_view_generic::<i32, Int8Type, _>($values, $indices, $expected, $fn);
2016            test_take_list_view_generic::<i64, Int8Type, _>($values, $indices, $expected, $fn);
2017        }};
2018    }
2019
2020    fn do_take_fixed_size_list_test<T>(
2021        length: <Int32Type as ArrowPrimitiveType>::Native,
2022        input_data: Vec<Option<Vec<Option<T::Native>>>>,
2023        indices: Vec<<UInt32Type as ArrowPrimitiveType>::Native>,
2024        expected_data: Vec<Option<Vec<Option<T::Native>>>>,
2025    ) where
2026        T: ArrowPrimitiveType,
2027        PrimitiveArray<T>: From<Vec<Option<T::Native>>>,
2028    {
2029        let indices = UInt32Array::from(indices);
2030
2031        let input_array = FixedSizeListArray::from_iter_primitive::<T, _, _>(input_data, length);
2032
2033        let output = take_fixed_size_list(&input_array, &indices, length as u32).unwrap();
2034
2035        let expected = FixedSizeListArray::from_iter_primitive::<T, _, _>(expected_data, length);
2036
2037        assert_eq!(&output, &expected)
2038    }
2039
2040    #[test]
2041    fn test_take_list() {
2042        test_take_list!(i32, List, ListArray);
2043    }
2044
2045    #[test]
2046    fn test_take_large_list() {
2047        test_take_list!(i64, LargeList, LargeListArray);
2048    }
2049
2050    #[test]
2051    fn test_take_list_with_value_nulls() {
2052        test_take_list_with_value_nulls!(i32, List, ListArray);
2053    }
2054
2055    #[test]
2056    fn test_take_large_list_with_value_nulls() {
2057        test_take_list_with_value_nulls!(i64, LargeList, LargeListArray);
2058    }
2059
2060    #[test]
2061    fn test_test_take_list_with_nulls() {
2062        test_take_list_with_nulls!(i32, List, ListArray);
2063    }
2064
2065    #[test]
2066    fn test_test_take_large_list_with_nulls() {
2067        test_take_list_with_nulls!(i64, LargeList, LargeListArray);
2068    }
2069
2070    #[test]
2071    fn test_test_take_list_view_reversed() {
2072        // Take reversed indices
2073        list_view_test_case! {
2074            values: vec![
2075                Some(vec![Some(1), None, Some(3)]),
2076                None,
2077                Some(vec![Some(7), Some(8), None]),
2078            ],
2079            indices: vec![Some(2), Some(1), Some(0)],
2080            expected: vec![
2081                Some(vec![Some(7), Some(8), None]),
2082                None,
2083                Some(vec![Some(1), None, Some(3)]),
2084            ]
2085        }
2086    }
2087
2088    #[test]
2089    fn test_take_list_view_null_indices() {
2090        // Take with null indices
2091        list_view_test_case! {
2092            values: vec![
2093                Some(vec![Some(1), None, Some(3)]),
2094                None,
2095                Some(vec![Some(7), Some(8), None]),
2096            ],
2097            indices: vec![None, Some(0), None],
2098            expected: vec![None, Some(vec![Some(1), None, Some(3)]), None]
2099        }
2100    }
2101
2102    #[test]
2103    fn test_take_list_view_null_values() {
2104        // Take at null values
2105        list_view_test_case! {
2106            values: vec![
2107                Some(vec![Some(1), None, Some(3)]),
2108                None,
2109                Some(vec![Some(7), Some(8), None]),
2110            ],
2111            indices: vec![Some(1), Some(1), Some(1), None, None],
2112            expected: vec![None; 5]
2113        }
2114    }
2115
2116    #[test]
2117    fn test_take_list_view_sliced() {
2118        // Take null indices/values, with slicing.
2119        list_view_test_case! {
2120            values: vec![
2121                Some(vec![Some(1)]),
2122                None,
2123                None,
2124                Some(vec![Some(2), Some(3)]),
2125                Some(vec![Some(4), Some(5)]),
2126                None,
2127            ],
2128            transform: |l| l.slice(2, 4),
2129            indices: vec![Some(0), Some(3), None, Some(1), Some(2)],
2130            expected: vec![
2131                None, None, None, Some(vec![Some(2), Some(3)]), Some(vec![Some(4), Some(5)])
2132            ]
2133        }
2134    }
2135
2136    #[test]
2137    fn test_take_fixed_size_list() {
2138        do_take_fixed_size_list_test::<Int32Type>(
2139            3,
2140            vec![
2141                Some(vec![None, Some(1), Some(2)]),
2142                Some(vec![Some(3), Some(4), None]),
2143                Some(vec![Some(6), Some(7), Some(8)]),
2144            ],
2145            vec![2, 1, 0],
2146            vec![
2147                Some(vec![Some(6), Some(7), Some(8)]),
2148                Some(vec![Some(3), Some(4), None]),
2149                Some(vec![None, Some(1), Some(2)]),
2150            ],
2151        );
2152
2153        do_take_fixed_size_list_test::<UInt8Type>(
2154            1,
2155            vec![
2156                Some(vec![Some(1)]),
2157                Some(vec![Some(2)]),
2158                Some(vec![Some(3)]),
2159                Some(vec![Some(4)]),
2160                Some(vec![Some(5)]),
2161                Some(vec![Some(6)]),
2162                Some(vec![Some(7)]),
2163                Some(vec![Some(8)]),
2164            ],
2165            vec![2, 7, 0],
2166            vec![
2167                Some(vec![Some(3)]),
2168                Some(vec![Some(8)]),
2169                Some(vec![Some(1)]),
2170            ],
2171        );
2172
2173        do_take_fixed_size_list_test::<UInt64Type>(
2174            3,
2175            vec![
2176                Some(vec![Some(10), Some(11), Some(12)]),
2177                Some(vec![Some(13), Some(14), Some(15)]),
2178                None,
2179                Some(vec![Some(16), Some(17), Some(18)]),
2180            ],
2181            vec![3, 2, 1, 2, 0],
2182            vec![
2183                Some(vec![Some(16), Some(17), Some(18)]),
2184                None,
2185                Some(vec![Some(13), Some(14), Some(15)]),
2186                None,
2187                Some(vec![Some(10), Some(11), Some(12)]),
2188            ],
2189        );
2190    }
2191
2192    #[test]
2193    fn test_take_fixed_size_binary_with_nulls_indices() {
2194        let fsb = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
2195            [
2196                Some(vec![0x01, 0x01, 0x01, 0x01]),
2197                Some(vec![0x02, 0x02, 0x02, 0x02]),
2198                Some(vec![0x03, 0x03, 0x03, 0x03]),
2199                Some(vec![0x04, 0x04, 0x04, 0x04]),
2200            ]
2201            .into_iter(),
2202            4,
2203        )
2204        .unwrap();
2205
2206        // The two middle indices are null -> Should be null in the output.
2207        let indices = UInt32Array::from(vec![Some(0), None, None, Some(3)]);
2208
2209        let result = take_fixed_size_binary(&fsb, &indices, 4).unwrap();
2210        assert_eq!(result.len(), 4);
2211        assert_eq!(result.null_count(), 2);
2212        assert_eq!(
2213            result.nulls().unwrap().iter().collect::<Vec<_>>(),
2214            vec![true, false, false, true]
2215        );
2216    }
2217
2218    /// The [`take_fixed_size_binary`] kernel contains optimizations that provide a faster
2219    /// implementation for commonly-used value lengths. This test uses a value length that is not
2220    /// optimized to test both code paths.
2221    #[test]
2222    fn test_take_fixed_size_binary_with_nulls_indices_not_optimized_length() {
2223        let fsb = FixedSizeBinaryArray::try_from_sparse_iter_with_size(
2224            [
2225                Some(vec![0x01, 0x01, 0x01, 0x01, 0x01]),
2226                Some(vec![0x02, 0x02, 0x02, 0x02, 0x01]),
2227                Some(vec![0x03, 0x03, 0x03, 0x03, 0x01]),
2228                Some(vec![0x04, 0x04, 0x04, 0x04, 0x01]),
2229            ]
2230            .into_iter(),
2231            5,
2232        )
2233        .unwrap();
2234
2235        // The two middle indices are null -> Should be null in the output.
2236        let indices = UInt32Array::from(vec![Some(0), None, None, Some(3)]);
2237
2238        let result = take_fixed_size_binary(&fsb, &indices, 5).unwrap();
2239        assert_eq!(result.len(), 4);
2240        assert_eq!(result.null_count(), 2);
2241        assert_eq!(
2242            result.nulls().unwrap().iter().collect::<Vec<_>>(),
2243            vec![true, false, false, true]
2244        );
2245    }
2246
2247    #[test]
2248    #[should_panic(expected = "index out of bounds: the len is 4 but the index is 1000")]
2249    fn test_take_list_out_of_bounds() {
2250        // Construct a value array, [[0,0,0], [-1,-2,-1], [2,3]]
2251        let value_data = Int32Array::from(vec![0, 0, 0, -1, -2, -1, 2, 3]).into_data();
2252        // Construct offsets
2253        let value_offsets = Buffer::from_slice_ref([0, 3, 6, 8]);
2254        // Construct a list array from the above two
2255        let list_data_type =
2256            DataType::List(Arc::new(Field::new_list_field(DataType::Int32, false)));
2257        let list_data = ArrayData::builder(list_data_type)
2258            .len(3)
2259            .add_buffer(value_offsets)
2260            .add_child_data(value_data)
2261            .build()
2262            .unwrap();
2263        let list_array = ListArray::from(list_data);
2264
2265        let index = UInt32Array::from(vec![1000]);
2266
2267        // A panic is expected here since we have not supplied the check_bounds
2268        // option.
2269        take(&list_array, &index, None).unwrap();
2270    }
2271
2272    #[test]
2273    fn test_take_map() {
2274        let values = Int32Array::from(vec![1, 2, 3, 4]);
2275        let array =
2276            MapArray::new_from_strings(vec!["a", "b", "c", "a"].into_iter(), &values, &[0, 3, 4])
2277                .unwrap();
2278
2279        let index = UInt32Array::from(vec![0]);
2280
2281        let result = take(&array, &index, None).unwrap();
2282        let expected: ArrayRef = Arc::new(
2283            MapArray::new_from_strings(
2284                vec!["a", "b", "c"].into_iter(),
2285                &values.slice(0, 3),
2286                &[0, 3],
2287            )
2288            .unwrap(),
2289        );
2290        assert_eq!(&expected, &result);
2291    }
2292
2293    #[test]
2294    fn test_take_struct() {
2295        let array = create_test_struct(vec![
2296            Some((Some(true), Some(42))),
2297            Some((Some(false), Some(28))),
2298            Some((Some(false), Some(19))),
2299            Some((Some(true), Some(31))),
2300            None,
2301        ]);
2302
2303        let index = UInt32Array::from(vec![0, 3, 1, 0, 2, 4]);
2304        let actual = take(&array, &index, None).unwrap();
2305        let actual: &StructArray = actual.as_any().downcast_ref::<StructArray>().unwrap();
2306        assert_eq!(index.len(), actual.len());
2307        assert_eq!(1, actual.null_count());
2308
2309        let expected = create_test_struct(vec![
2310            Some((Some(true), Some(42))),
2311            Some((Some(true), Some(31))),
2312            Some((Some(false), Some(28))),
2313            Some((Some(true), Some(42))),
2314            Some((Some(false), Some(19))),
2315            None,
2316        ]);
2317
2318        assert_eq!(&expected, actual);
2319
2320        let nulls = NullBuffer::from(&[false, true, false, true, false, true]);
2321        let empty_struct_arr = StructArray::new_empty_fields(6, Some(nulls));
2322        let index = UInt32Array::from(vec![0, 2, 1, 4]);
2323        let actual = take(&empty_struct_arr, &index, None).unwrap();
2324
2325        let expected_nulls = NullBuffer::from(&[false, false, true, false]);
2326        let expected_struct_arr = StructArray::new_empty_fields(4, Some(expected_nulls));
2327        assert_eq!(&expected_struct_arr, actual.as_struct());
2328    }
2329
2330    #[test]
2331    fn test_take_struct_with_null_indices() {
2332        let array = create_test_struct(vec![
2333            Some((Some(true), Some(42))),
2334            Some((Some(false), Some(28))),
2335            Some((Some(false), Some(19))),
2336            Some((Some(true), Some(31))),
2337            None,
2338        ]);
2339
2340        let index = UInt32Array::from(vec![None, Some(3), Some(1), None, Some(0), Some(4)]);
2341        let actual = take(&array, &index, None).unwrap();
2342        let actual: &StructArray = actual.as_any().downcast_ref::<StructArray>().unwrap();
2343        assert_eq!(index.len(), actual.len());
2344        assert_eq!(3, actual.null_count()); // 2 because of indices, 1 because of struct array
2345
2346        let expected = create_test_struct(vec![
2347            None,
2348            Some((Some(true), Some(31))),
2349            Some((Some(false), Some(28))),
2350            None,
2351            Some((Some(true), Some(42))),
2352            None,
2353        ]);
2354
2355        assert_eq!(&expected, actual);
2356    }
2357
2358    #[test]
2359    fn test_take_out_of_bounds() {
2360        let index = UInt32Array::from(vec![Some(3), None, Some(1), Some(3), Some(6)]);
2361        let take_opt = TakeOptions { check_bounds: true };
2362
2363        // int64
2364        let result = test_take_primitive_arrays::<Int64Type>(
2365            vec![Some(0), None, Some(2), Some(3), None],
2366            &index,
2367            Some(take_opt),
2368            vec![None],
2369        );
2370        assert!(result.is_err());
2371    }
2372
2373    #[test]
2374    #[should_panic(expected = "index out of bounds: the len is 4 but the index is 1000")]
2375    fn test_take_out_of_bounds_panic() {
2376        let index = UInt32Array::from(vec![Some(1000)]);
2377
2378        test_take_primitive_arrays::<Int64Type>(
2379            vec![Some(0), Some(1), Some(2), Some(3)],
2380            &index,
2381            None,
2382            vec![None],
2383        )
2384        .unwrap();
2385    }
2386
2387    #[test]
2388    fn test_null_array_smaller_than_indices() {
2389        let values = NullArray::new(2);
2390        let indices = UInt32Array::from(vec![Some(0), None, Some(15)]);
2391
2392        let result = take(&values, &indices, None).unwrap();
2393        let expected: ArrayRef = Arc::new(NullArray::new(3));
2394        assert_eq!(&result, &expected);
2395    }
2396
2397    #[test]
2398    fn test_null_array_larger_than_indices() {
2399        let values = NullArray::new(5);
2400        let indices = UInt32Array::from(vec![Some(0), None, Some(15)]);
2401
2402        let result = take(&values, &indices, None).unwrap();
2403        let expected: ArrayRef = Arc::new(NullArray::new(3));
2404        assert_eq!(&result, &expected);
2405    }
2406
2407    #[test]
2408    fn test_null_array_indices_out_of_bounds() {
2409        let values = NullArray::new(5);
2410        let indices = UInt32Array::from(vec![Some(0), None, Some(15)]);
2411
2412        let result = take(&values, &indices, Some(TakeOptions { check_bounds: true }));
2413        assert_eq!(
2414            result.unwrap_err().to_string(),
2415            "Compute error: Array index out of bounds, cannot get item at index 15 from 5 entries"
2416        );
2417    }
2418
2419    #[test]
2420    fn test_take_dict() {
2421        let mut dict_builder = StringDictionaryBuilder::<Int16Type>::new();
2422
2423        dict_builder.append("foo").unwrap();
2424        dict_builder.append("bar").unwrap();
2425        dict_builder.append("").unwrap();
2426        dict_builder.append_null();
2427        dict_builder.append("foo").unwrap();
2428        dict_builder.append("bar").unwrap();
2429        dict_builder.append("bar").unwrap();
2430        dict_builder.append("foo").unwrap();
2431
2432        let array = dict_builder.finish();
2433        let dict_values = array.values().clone();
2434        let dict_values = dict_values.as_any().downcast_ref::<StringArray>().unwrap();
2435
2436        let indices = UInt32Array::from(vec![
2437            Some(0), // first "foo"
2438            Some(7), // last "foo"
2439            None,    // null index should return null
2440            Some(5), // second "bar"
2441            Some(6), // another "bar"
2442            Some(2), // empty string
2443            Some(3), // input is null at this index
2444        ]);
2445
2446        let result = take(&array, &indices, None).unwrap();
2447        let result = result
2448            .as_any()
2449            .downcast_ref::<DictionaryArray<Int16Type>>()
2450            .unwrap();
2451
2452        let result_values: StringArray = result.values().to_data().into();
2453
2454        // dictionary values should stay the same
2455        let expected_values = StringArray::from(vec!["foo", "bar", ""]);
2456        assert_eq!(&expected_values, dict_values);
2457        assert_eq!(&expected_values, &result_values);
2458
2459        let expected_keys = Int16Array::from(vec![
2460            Some(0),
2461            Some(0),
2462            None,
2463            Some(1),
2464            Some(1),
2465            Some(2),
2466            None,
2467        ]);
2468        assert_eq!(result.keys(), &expected_keys);
2469    }
2470
2471    fn build_generic_list<S, T>(data: Vec<Option<Vec<T::Native>>>) -> GenericListArray<S>
2472    where
2473        S: OffsetSizeTrait + 'static,
2474        T: ArrowPrimitiveType,
2475        PrimitiveArray<T>: From<Vec<Option<T::Native>>>,
2476    {
2477        GenericListArray::from_iter_primitive::<T, _, _>(
2478            data.iter()
2479                .map(|x| x.as_ref().map(|x| x.iter().map(|x| Some(*x)))),
2480        )
2481    }
2482
2483    fn test_take_sliced_list_generic<S: OffsetSizeTrait + 'static>() {
2484        let list = build_generic_list::<S, Int32Type>(vec![
2485            Some(vec![0, 1]),
2486            Some(vec![2, 3, 4]),
2487            None,
2488            Some(vec![]),
2489            Some(vec![5, 6]),
2490            Some(vec![7]),
2491        ]);
2492        let sliced = list.slice(1, 4);
2493        let indices = UInt32Array::from(vec![Some(3), Some(0), None, Some(2), Some(1)]);
2494
2495        let taken = take(&sliced, &indices, None).unwrap();
2496        let taken = taken.as_list::<S>();
2497
2498        let expected = build_generic_list::<S, Int32Type>(vec![
2499            Some(vec![5, 6]),
2500            Some(vec![2, 3, 4]),
2501            None,
2502            Some(vec![]),
2503            None,
2504        ]);
2505
2506        assert_eq!(taken, &expected);
2507    }
2508
2509    fn test_take_sliced_list_with_value_nulls_generic<S: OffsetSizeTrait + 'static>() {
2510        let list = GenericListArray::<S>::from_iter_primitive::<Int32Type, _, _>(vec![
2511            Some(vec![Some(10)]),
2512            Some(vec![None, Some(1)]),
2513            None,
2514            Some(vec![Some(2), None]),
2515            Some(vec![]),
2516            Some(vec![Some(3)]),
2517        ]);
2518        let sliced = list.slice(1, 4);
2519        let indices = UInt32Array::from(vec![Some(2), Some(0), None, Some(3), Some(1)]);
2520
2521        let taken = take(&sliced, &indices, None).unwrap();
2522        let taken = taken.as_list::<S>();
2523
2524        let expected = GenericListArray::<S>::from_iter_primitive::<Int32Type, _, _>(vec![
2525            Some(vec![Some(2), None]),
2526            Some(vec![None, Some(1)]),
2527            None,
2528            Some(vec![]),
2529            None,
2530        ]);
2531
2532        assert_eq!(taken, &expected);
2533    }
2534
2535    #[test]
2536    fn test_take_sliced_list() {
2537        test_take_sliced_list_generic::<i32>();
2538    }
2539
2540    #[test]
2541    fn test_take_sliced_large_list() {
2542        test_take_sliced_list_generic::<i64>();
2543    }
2544
2545    #[test]
2546    fn test_take_sliced_list_with_value_nulls() {
2547        test_take_sliced_list_with_value_nulls_generic::<i32>();
2548    }
2549
2550    #[test]
2551    fn test_take_sliced_large_list_with_value_nulls() {
2552        test_take_sliced_list_with_value_nulls_generic::<i64>();
2553    }
2554
2555    #[test]
2556    fn test_take_runs() {
2557        let logical_array: Vec<i32> = vec![1_i32, 1, 2, 2, 1, 1, 1, 2, 2, 1, 1, 2, 2];
2558
2559        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
2560        builder.extend(logical_array.into_iter().map(Some));
2561        let run_array = builder.finish();
2562
2563        let take_indices: PrimitiveArray<Int32Type> =
2564            vec![7, 2, 3, 7, 11, 4, 6].into_iter().collect();
2565
2566        let take_out = take_run(&run_array, &take_indices).unwrap();
2567
2568        assert_eq!(take_out.len(), 7);
2569        assert_eq!(take_out.run_ends().len(), 7);
2570        assert_eq!(take_out.run_ends().values(), &[1_i32, 3, 4, 5, 7]);
2571
2572        let take_out_values = take_out.values().as_primitive::<Int32Type>();
2573        assert_eq!(take_out_values.values(), &[2, 2, 2, 2, 1]);
2574    }
2575
2576    #[test]
2577    fn test_take_runs_sliced() {
2578        let logical_array: Vec<i32> = vec![1, 1, 2, 2, 3, 3, 3, 4, 4, 5, 5, 6, 6];
2579
2580        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
2581        builder.extend(logical_array.into_iter().map(Some));
2582        let run_array = builder.finish();
2583
2584        let run_array = run_array.slice(4, 6); // [3, 3, 3, 4, 4, 5]
2585
2586        let take_indices: PrimitiveArray<Int32Type> = vec![0, 5, 5, 1, 4].into_iter().collect();
2587
2588        let result = take_run(&run_array, &take_indices).unwrap();
2589        let result = result.downcast::<Int32Array>().unwrap();
2590
2591        let expected = vec![3, 5, 5, 3, 4];
2592        let actual = result.into_iter().flatten().collect::<Vec<_>>();
2593
2594        assert_eq!(expected, actual);
2595    }
2596
2597    #[test]
2598    fn test_take_value_index_from_fixed_list() {
2599        let list = FixedSizeListArray::from_iter_primitive::<Int32Type, _, _>(
2600            vec![
2601                Some(vec![Some(1), Some(2), None]),
2602                Some(vec![Some(4), None, Some(6)]),
2603                None,
2604                Some(vec![None, Some(8), Some(9)]),
2605            ],
2606            3,
2607        );
2608
2609        let indices = UInt32Array::from(vec![2, 1, 0]);
2610        let indexed = take_value_indices_from_fixed_size_list(&list, &indices, 3).unwrap();
2611
2612        assert_eq!(indexed, UInt32Array::from(vec![6, 7, 8, 3, 4, 5, 0, 1, 2]));
2613
2614        let indices = UInt32Array::from(vec![3, 2, 1, 2, 0]);
2615        let indexed = take_value_indices_from_fixed_size_list(&list, &indices, 3).unwrap();
2616
2617        assert_eq!(
2618            indexed,
2619            UInt32Array::from(vec![9, 10, 11, 6, 7, 8, 3, 4, 5, 6, 7, 8, 0, 1, 2])
2620        );
2621    }
2622
2623    #[test]
2624    fn test_take_null_indices() {
2625        // Build indices with values that are out of bounds, but masked by null mask
2626        let indices = Int32Array::new(
2627            vec![1, 2, 400, 400].into(),
2628            Some(NullBuffer::from(vec![true, true, false, false])),
2629        );
2630        let values = Int32Array::from(vec![1, 23, 4, 5]);
2631        let r = take(&values, &indices, None).unwrap();
2632        let values = r
2633            .as_primitive::<Int32Type>()
2634            .into_iter()
2635            .collect::<Vec<_>>();
2636        assert_eq!(&values, &[Some(23), Some(4), None, None])
2637    }
2638
2639    #[test]
2640    fn test_take_fixed_size_list_null_indices() {
2641        let indices = Int32Array::from_iter([Some(0), None]);
2642        let values = Arc::new(Int32Array::from(vec![0, 1, 2, 3]));
2643        let arr_field = Arc::new(Field::new_list_field(values.data_type().clone(), true));
2644        let values = FixedSizeListArray::try_new(arr_field, 2, values, None).unwrap();
2645
2646        let r = take(&values, &indices, None).unwrap();
2647        let values = r
2648            .as_fixed_size_list()
2649            .values()
2650            .as_primitive::<Int32Type>()
2651            .into_iter()
2652            .collect::<Vec<_>>();
2653        assert_eq!(values, &[Some(0), Some(1), None, None])
2654    }
2655
2656    #[test]
2657    fn test_take_bytes_null_indices() {
2658        let indices = Int32Array::new(
2659            vec![0, 1, 400, 400].into(),
2660            Some(NullBuffer::from_iter(vec![true, true, false, false])),
2661        );
2662        let values = StringArray::from(vec![Some("foo"), None]);
2663        let r = take(&values, &indices, None).unwrap();
2664        let values = r.as_string::<i32>().iter().collect::<Vec<_>>();
2665        assert_eq!(&values, &[Some("foo"), None, None, None])
2666    }
2667
2668    #[test]
2669    fn test_take_union_sparse() {
2670        let structs = create_test_struct(vec![
2671            Some((Some(true), Some(42))),
2672            Some((Some(false), Some(28))),
2673            Some((Some(false), Some(19))),
2674            Some((Some(true), Some(31))),
2675            None,
2676        ]);
2677        let strings = StringArray::from(vec![Some("a"), None, Some("c"), None, Some("d")]);
2678        let type_ids = [1; 5].into_iter().collect::<ScalarBuffer<i8>>();
2679
2680        let union_fields = [
2681            (
2682                0,
2683                Arc::new(Field::new("f1", structs.data_type().clone(), true)),
2684            ),
2685            (
2686                1,
2687                Arc::new(Field::new("f2", strings.data_type().clone(), true)),
2688            ),
2689        ]
2690        .into_iter()
2691        .collect();
2692        let children = vec![Arc::new(structs) as Arc<dyn Array>, Arc::new(strings)];
2693        let array = UnionArray::try_new(union_fields, type_ids, None, children).unwrap();
2694
2695        let indices = vec![0, 3, 1, 0, 2, 4];
2696        let index = UInt32Array::from(indices.clone());
2697        let actual = take(&array, &index, None).unwrap();
2698        let actual = actual.as_any().downcast_ref::<UnionArray>().unwrap();
2699        let strings = actual.child(1);
2700        let strings = strings.as_any().downcast_ref::<StringArray>().unwrap();
2701
2702        let actual = strings.iter().collect::<Vec<_>>();
2703        let expected = vec![Some("a"), None, None, Some("a"), Some("c"), Some("d")];
2704        assert_eq!(expected, actual);
2705    }
2706
2707    #[test]
2708    fn test_take_union_dense() {
2709        let type_ids = vec![0, 1, 1, 0, 0, 1, 0];
2710        let offsets = vec![0, 0, 1, 1, 2, 2, 3];
2711        let ints = vec![10, 20, 30, 40];
2712        let strings = vec![Some("a"), None, Some("c"), Some("d")];
2713
2714        let indices = vec![0, 3, 1, 0, 2, 4];
2715
2716        let taken_type_ids = vec![0, 0, 1, 0, 1, 0];
2717        let taken_offsets = vec![0, 1, 0, 2, 1, 3];
2718        let taken_ints = vec![10, 20, 10, 30];
2719        let taken_strings = vec![Some("a"), None];
2720
2721        let type_ids = <ScalarBuffer<i8>>::from(type_ids);
2722        let offsets = <ScalarBuffer<i32>>::from(offsets);
2723        let ints = UInt32Array::from(ints);
2724        let strings = StringArray::from(strings);
2725
2726        let union_fields = [
2727            (
2728                0,
2729                Arc::new(Field::new("f1", ints.data_type().clone(), true)),
2730            ),
2731            (
2732                1,
2733                Arc::new(Field::new("f2", strings.data_type().clone(), true)),
2734            ),
2735        ]
2736        .into_iter()
2737        .collect();
2738
2739        let array = UnionArray::try_new(
2740            union_fields,
2741            type_ids,
2742            Some(offsets),
2743            vec![Arc::new(ints), Arc::new(strings)],
2744        )
2745        .unwrap();
2746
2747        let index = UInt32Array::from(indices);
2748
2749        let actual = take(&array, &index, None).unwrap();
2750        let actual = actual.as_any().downcast_ref::<UnionArray>().unwrap();
2751
2752        assert_eq!(actual.offsets(), Some(&ScalarBuffer::from(taken_offsets)));
2753        assert_eq!(actual.type_ids(), &ScalarBuffer::from(taken_type_ids));
2754        assert_eq!(
2755            UInt32Array::from(actual.child(0).to_data()),
2756            UInt32Array::from(taken_ints)
2757        );
2758        assert_eq!(
2759            StringArray::from(actual.child(1).to_data()),
2760            StringArray::from(taken_strings)
2761        );
2762    }
2763
2764    #[test]
2765    fn test_take_union_dense_using_builder() {
2766        let mut builder = UnionBuilder::new_dense();
2767
2768        builder.append::<Int32Type>("a", 1).unwrap();
2769        builder.append::<Float64Type>("b", 3.0).unwrap();
2770        builder.append::<Int32Type>("a", 4).unwrap();
2771        builder.append::<Int32Type>("a", 5).unwrap();
2772        builder.append::<Float64Type>("b", 2.0).unwrap();
2773
2774        let union = builder.build().unwrap();
2775
2776        let indices = UInt32Array::from(vec![2, 0, 1, 2]);
2777
2778        let mut builder = UnionBuilder::new_dense();
2779
2780        builder.append::<Int32Type>("a", 4).unwrap();
2781        builder.append::<Int32Type>("a", 1).unwrap();
2782        builder.append::<Float64Type>("b", 3.0).unwrap();
2783        builder.append::<Int32Type>("a", 4).unwrap();
2784
2785        let taken = builder.build().unwrap();
2786
2787        assert_eq!(
2788            taken.to_data(),
2789            take(&union, &indices, None).unwrap().to_data()
2790        );
2791    }
2792
2793    #[test]
2794    fn test_take_union_dense_all_match_issue_6206() {
2795        let fields = UnionFields::from_fields(vec![Field::new("a", DataType::Int64, false)]);
2796        let ints = Arc::new(Int64Array::from(vec![1, 2, 3, 4, 5]));
2797
2798        let array = UnionArray::try_new(
2799            fields,
2800            ScalarBuffer::from(vec![0_i8, 0, 0, 0, 0]),
2801            Some(ScalarBuffer::from_iter(0_i32..5)),
2802            vec![ints],
2803        )
2804        .unwrap();
2805
2806        let indicies = Int64Array::from(vec![0, 2, 4]);
2807        let array = take(&array, &indicies, None).unwrap();
2808        assert_eq!(array.len(), 3);
2809    }
2810
2811    #[test]
2812    fn test_take_bytes_offset_overflow() {
2813        let indices = Int32Array::from(vec![0; (i32::MAX >> 4) as usize]);
2814        let text = ('a'..='z').collect::<String>();
2815        let values = StringArray::from(vec![Some(text.clone())]);
2816        assert!(matches!(
2817            take(&values, &indices, None),
2818            Err(ArrowError::OffsetOverflowError(_))
2819        ));
2820    }
2821
2822    #[test]
2823    fn test_take_run_empty_indices() {
2824        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
2825        builder.extend([Some(1), Some(1), Some(2), Some(2)]);
2826        let run_array = builder.finish();
2827
2828        let logical_indices: PrimitiveArray<Int32Type> = PrimitiveArray::from(Vec::<i32>::new());
2829
2830        let result = take_impl(&run_array, &logical_indices).expect("take_run with empty indices");
2831
2832        // Verify the result is a valid empty RunArray
2833        assert_eq!(result.len(), 0);
2834        assert_eq!(result.null_count(), 0);
2835
2836        // Verify that the result can be downcast and used without validation errors
2837        // This specifically tests that "The values in run_ends array should be strictly positive" is not triggered
2838        let run_result = result
2839            .as_any()
2840            .downcast_ref::<RunArray<Int32Type>>()
2841            .expect("result should be a RunArray");
2842        assert_eq!(run_result.run_ends().len(), 0);
2843        assert_eq!(run_result.values().len(), 0);
2844    }
2845}