Skip to main content

arrow_select/
interleave.rs

1// Licensed to the Apache Software Foundation (ASF) under one
2// or more contributor license agreements.  See the NOTICE file
3// distributed with this work for additional information
4// regarding copyright ownership.  The ASF licenses this file
5// to you under the Apache License, Version 2.0 (the
6// "License"); you may not use this file except in compliance
7// with the License.  You may obtain a copy of the License at
8//
9//   http://www.apache.org/licenses/LICENSE-2.0
10//
11// Unless required by applicable law or agreed to in writing,
12// software distributed under the License is distributed on an
13// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14// KIND, either express or implied.  See the License for the
15// specific language governing permissions and limitations
16// under the License.
17
18//! Interleave elements from multiple arrays
19
20use crate::concat::concat;
21use crate::dictionary::{merge_dictionary_values, should_merge_dictionary_values};
22use arrow_array::builder::{BooleanBufferBuilder, PrimitiveBuilder};
23use arrow_array::cast::AsArray;
24use arrow_array::types::*;
25use arrow_array::*;
26use arrow_buffer::{ArrowNativeType, BooleanBuffer, MutableBuffer, NullBuffer, OffsetBuffer};
27use arrow_data::ByteView;
28use arrow_data::transform::MutableArrayData;
29use arrow_schema::{ArrowError, DataType, FieldRef, Fields};
30use std::sync::Arc;
31
32macro_rules! primitive_helper {
33    ($t:ty, $values:ident, $indices:ident, $data_type:ident) => {
34        interleave_primitive::<$t>($values, $indices, $data_type)
35    };
36}
37
38macro_rules! dict_helper {
39    ($t:ty, $values:expr, $indices:expr) => {
40        interleave_dictionaries::<$t>($values, $indices)
41    };
42}
43
44///
45/// Takes elements by index from a list of [`Array`], creating a new [`Array`] from those values.
46///
47/// Each element in `indices` is a pair of `usize` with the first identifying the index
48/// of the [`Array`] in `values`, and the second the index of the value within that [`Array`]
49///
50/// ```text
51/// ┌─────────────────┐      ┌─────────┐                                  ┌─────────────────┐
52/// │        A        │      │ (0, 0)  │        interleave(               │        A        │
53/// ├─────────────────┤      ├─────────┤          [values0, values1],     ├─────────────────┤
54/// │        D        │      │ (1, 0)  │          indices                 │        B        │
55/// └─────────────────┘      ├─────────┤        )                         ├─────────────────┤
56///   values array 0         │ (1, 1)  │      ─────────────────────────▶  │        C        │
57///                          ├─────────┤                                  ├─────────────────┤
58///                          │ (0, 1)  │                                  │        D        │
59///                          └─────────┘                                  └─────────────────┘
60/// ┌─────────────────┐       indices
61/// │        B        │        array
62/// ├─────────────────┤                                                    result
63/// │        C        │
64/// ├─────────────────┤
65/// │        E        │
66/// └─────────────────┘
67///   values array 1
68/// ```
69///
70/// For selecting values by index from a single array see [`crate::take`]
71pub fn interleave(
72    values: &[&dyn Array],
73    indices: &[(usize, usize)],
74) -> Result<ArrayRef, ArrowError> {
75    if values.is_empty() {
76        return Err(ArrowError::InvalidArgumentError(
77            "interleave requires input of at least one array".to_string(),
78        ));
79    }
80    let data_type = values[0].data_type();
81
82    for array in values.iter().skip(1) {
83        if array.data_type() != data_type {
84            return Err(ArrowError::InvalidArgumentError(format!(
85                "It is not possible to interleave arrays of different data types ({} and {})",
86                data_type,
87                array.data_type()
88            )));
89        }
90    }
91
92    if indices.is_empty() {
93        return Ok(new_empty_array(data_type));
94    }
95
96    downcast_primitive! {
97        data_type => (primitive_helper, values, indices, data_type),
98        DataType::Utf8 => interleave_bytes::<Utf8Type>(values, indices),
99        DataType::LargeUtf8 => interleave_bytes::<LargeUtf8Type>(values, indices),
100        DataType::Binary => interleave_bytes::<BinaryType>(values, indices),
101        DataType::LargeBinary => interleave_bytes::<LargeBinaryType>(values, indices),
102        DataType::BinaryView => interleave_views::<BinaryViewType>(values, indices),
103        DataType::Utf8View => interleave_views::<StringViewType>(values, indices),
104        DataType::Dictionary(k, _) => downcast_integer! {
105            k.as_ref() => (dict_helper, values, indices),
106            _ => unreachable!("illegal dictionary key type {k}")
107        },
108        DataType::Struct(fields) => interleave_struct(fields, values, indices),
109        DataType::List(field) => interleave_list::<i32>(values, indices, field),
110        DataType::LargeList(field) => interleave_list::<i64>(values, indices, field),
111        DataType::RunEndEncoded(r, _) => match r.data_type() {
112            DataType::Int16 => interleave_run_end::<Int16Type>(values, indices),
113            DataType::Int32 => interleave_run_end::<Int32Type>(values, indices),
114            DataType::Int64 => interleave_run_end::<Int64Type>(values, indices),
115            t => unreachable!("illegal run-end type {t}"),
116        },
117        DataType::ListView(field) => interleave_list_view::<i32>(values, indices, field),
118        DataType::LargeListView(field) => interleave_list_view::<i64>(values, indices, field),
119        _ => interleave_fallback(values, indices)
120    }
121}
122
123/// Common functionality for interleaving arrays
124///
125/// T is the concrete Array type
126struct Interleave<'a, T> {
127    /// The input arrays downcast to T
128    arrays: Vec<&'a T>,
129    /// The null buffer of the interleaved output
130    nulls: Option<NullBuffer>,
131}
132
133impl<'a, T: Array + 'static> Interleave<'a, T> {
134    fn new(values: &[&'a dyn Array], indices: &'a [(usize, usize)]) -> Self {
135        let mut has_nulls = false;
136        let arrays: Vec<&T> = values
137            .iter()
138            .map(|x| {
139                has_nulls = has_nulls || x.null_count() != 0;
140                x.as_any().downcast_ref().unwrap()
141            })
142            .collect();
143
144        let nulls = match has_nulls {
145            true => {
146                let nulls = BooleanBuffer::collect_bool(indices.len(), |i| {
147                    let (a, b) = indices[i];
148                    arrays[a].is_valid(b)
149                });
150                Some(nulls.into())
151            }
152            false => None,
153        };
154
155        Self { arrays, nulls }
156    }
157}
158
159fn interleave_primitive<T: ArrowPrimitiveType>(
160    values: &[&dyn Array],
161    indices: &[(usize, usize)],
162    data_type: &DataType,
163) -> Result<ArrayRef, ArrowError> {
164    let interleaved = Interleave::<'_, PrimitiveArray<T>>::new(values, indices);
165    let arrays = &interleaved.arrays;
166    let len = indices.len();
167
168    let mut output = Vec::with_capacity(len);
169    let dst: *mut T::Native = output.as_mut_ptr();
170    let mut base = 0;
171
172    // Process 8 elements at a time to issue multiple independent loads
173    // and increase memory-level parallelism for random access patterns.
174    let chunks = indices.chunks_exact(8);
175    let remainder = chunks.remainder();
176    for chunk in chunks {
177        let v0 = arrays[chunk[0].0].value(chunk[0].1);
178        let v1 = arrays[chunk[1].0].value(chunk[1].1);
179        let v2 = arrays[chunk[2].0].value(chunk[2].1);
180        let v3 = arrays[chunk[3].0].value(chunk[3].1);
181        let v4 = arrays[chunk[4].0].value(chunk[4].1);
182        let v5 = arrays[chunk[5].0].value(chunk[5].1);
183        let v6 = arrays[chunk[6].0].value(chunk[6].1);
184        let v7 = arrays[chunk[7].0].value(chunk[7].1);
185
186        // SAFETY: base+7 < len == output capacity
187        debug_assert!(base + 7 < len);
188        unsafe {
189            dst.add(base).write(v0);
190            dst.add(base + 1).write(v1);
191            dst.add(base + 2).write(v2);
192            dst.add(base + 3).write(v3);
193            dst.add(base + 4).write(v4);
194            dst.add(base + 5).write(v5);
195            dst.add(base + 6).write(v6);
196            dst.add(base + 7).write(v7);
197        }
198        base += 8;
199    }
200
201    for idx in remainder {
202        // SAFETY: base < len == output capacity
203        debug_assert!(base < len);
204        unsafe { dst.add(base).write(arrays[idx.0].value(idx.1)) };
205        base += 1;
206    }
207
208    // SAFETY: all `len` elements have been initialized
209    debug_assert!(base == len);
210    unsafe { output.set_len(len) };
211
212    let array = PrimitiveArray::<T>::try_new(output.into(), interleaved.nulls)?;
213    Ok(Arc::new(array.with_data_type(data_type.clone())))
214}
215
216fn interleave_bytes<T: ByteArrayType>(
217    values: &[&dyn Array],
218    indices: &[(usize, usize)],
219) -> Result<ArrayRef, ArrowError> {
220    let interleaved = Interleave::<'_, GenericByteArray<T>>::new(values, indices);
221
222    let mut capacity = 0;
223    let mut offsets = Vec::with_capacity(indices.len() + 1);
224    offsets.push(T::Offset::from_usize(0).unwrap());
225    for (a, b) in indices {
226        let o = interleaved.arrays[*a].value_offsets();
227        let element_len = o[*b + 1].as_usize() - o[*b].as_usize();
228        capacity += element_len;
229        offsets.push(
230            T::Offset::from_usize(capacity)
231                .ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
232        );
233    }
234
235    let mut values = Vec::with_capacity(capacity);
236    for (a, b) in indices {
237        values.extend_from_slice(interleaved.arrays[*a].value(*b).as_ref());
238    }
239
240    // Safety: safe by construction
241    let array = unsafe {
242        let offsets = OffsetBuffer::new_unchecked(offsets.into());
243        GenericByteArray::<T>::new_unchecked(offsets, values.into(), interleaved.nulls)
244    };
245    Ok(Arc::new(array))
246}
247
248fn interleave_dictionaries<K: ArrowDictionaryKeyType>(
249    arrays: &[&dyn Array],
250    indices: &[(usize, usize)],
251) -> Result<ArrayRef, ArrowError> {
252    let dictionaries: Vec<_> = arrays.iter().map(|x| x.as_dictionary::<K>()).collect();
253    let (should_merge, has_overflow) =
254        should_merge_dictionary_values::<K>(&dictionaries, indices.len());
255    if !should_merge {
256        return if has_overflow {
257            interleave_fallback(arrays, indices)
258        } else {
259            interleave_fallback_dictionary::<K>(&dictionaries, indices)
260        };
261    }
262
263    let masks: Vec<_> = dictionaries
264        .iter()
265        .enumerate()
266        .map(|(a_idx, dictionary)| {
267            let mut key_mask = BooleanBufferBuilder::new_from_buffer(
268                MutableBuffer::new_null(dictionary.len()),
269                dictionary.len(),
270            );
271
272            for (_, key_idx) in indices.iter().filter(|(a, _)| *a == a_idx) {
273                key_mask.set_bit(*key_idx, true);
274            }
275            key_mask.finish()
276        })
277        .collect();
278
279    let merged = merge_dictionary_values(&dictionaries, Some(&masks))?;
280
281    // Recompute keys
282    let mut keys = PrimitiveBuilder::<K>::with_capacity(indices.len());
283    for (a, b) in indices {
284        let old_keys: &PrimitiveArray<K> = dictionaries[*a].keys();
285        match old_keys.is_valid(*b) {
286            true => {
287                let old_key = old_keys.values()[*b];
288                keys.append_value(merged.key_mappings[*a][old_key.as_usize()])
289            }
290            false => keys.append_null(),
291        }
292    }
293    let array = unsafe { DictionaryArray::new_unchecked(keys.finish(), merged.values) };
294    Ok(Arc::new(array))
295}
296
297fn interleave_views<T: ByteViewType>(
298    values: &[&dyn Array],
299    indices: &[(usize, usize)],
300) -> Result<ArrayRef, ArrowError> {
301    let interleaved = Interleave::<'_, GenericByteViewArray<T>>::new(values, indices);
302    let mut buffers = Vec::new();
303
304    // Contains the offsets of start buffer in `buffer_to_new_index`
305    let mut offsets = Vec::with_capacity(interleaved.arrays.len() + 1);
306    offsets.push(0);
307    let mut total_buffers = 0;
308    for a in interleaved.arrays.iter() {
309        total_buffers += a.data_buffers().len();
310        offsets.push(total_buffers);
311    }
312
313    // contains the mapping from old buffer index to new buffer index
314    let mut buffer_to_new_index = vec![None; total_buffers];
315
316    let views: Vec<u128> = indices
317        .iter()
318        .map(|(array_idx, value_idx)| {
319            let array = interleaved.arrays[*array_idx];
320            let view = array.views().get(*value_idx).unwrap();
321            let view_len = *view as u32;
322            if view_len <= 12 {
323                return *view;
324            }
325            // value is big enough to be in a variadic buffer
326            let view = ByteView::from(*view);
327            let buffer_to_new_idx = offsets[*array_idx] + view.buffer_index as usize;
328            let new_buffer_idx: u32 =
329                *buffer_to_new_index[buffer_to_new_idx].get_or_insert_with(|| {
330                    buffers.push(array.data_buffers()[view.buffer_index as usize].clone());
331                    (buffers.len() - 1) as u32
332                });
333            view.with_buffer_index(new_buffer_idx).as_u128()
334        })
335        .collect();
336
337    let array = unsafe {
338        GenericByteViewArray::<T>::new_unchecked(views.into(), buffers, interleaved.nulls)
339    };
340    Ok(Arc::new(array))
341}
342
343fn interleave_struct(
344    fields: &Fields,
345    values: &[&dyn Array],
346    indices: &[(usize, usize)],
347) -> Result<ArrayRef, ArrowError> {
348    let interleaved = Interleave::<'_, StructArray>::new(values, indices);
349
350    if fields.is_empty() {
351        let array = StructArray::try_new_with_length(
352            fields.clone(),
353            vec![],
354            interleaved.nulls,
355            indices.len(),
356        )?;
357        return Ok(Arc::new(array));
358    }
359
360    let struct_fields_array: Result<Vec<_>, _> = (0..fields.len())
361        .map(|i| {
362            let field_values: Vec<&dyn Array> = interleaved
363                .arrays
364                .iter()
365                .map(|x| x.column(i).as_ref())
366                .collect();
367            interleave(&field_values, indices)
368        })
369        .collect();
370
371    let struct_array =
372        StructArray::try_new(fields.clone(), struct_fields_array?, interleaved.nulls)?;
373    Ok(Arc::new(struct_array))
374}
375
376fn interleave_list<O: OffsetSizeTrait>(
377    values: &[&dyn Array],
378    indices: &[(usize, usize)],
379    field: &FieldRef,
380) -> Result<ArrayRef, ArrowError> {
381    let interleaved = Interleave::<'_, GenericListArray<O>>::new(values, indices);
382
383    let mut capacity = 0usize;
384    let mut offsets = Vec::with_capacity(indices.len() + 1);
385    offsets.push(O::from_usize(0).unwrap());
386    for (array, row) in indices {
387        let o = interleaved.arrays[*array].value_offsets();
388        let element_len = o[*row + 1].as_usize() - o[*row].as_usize();
389        capacity += element_len;
390        offsets.push(
391            O::from_usize(capacity).ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
392        );
393    }
394
395    let mut child_indices = Vec::with_capacity(capacity);
396    for (array, row) in indices {
397        let list = interleaved.arrays[*array];
398        let start = list.value_offsets()[*row].as_usize();
399        let end = list.value_offsets()[*row + 1].as_usize();
400        child_indices.extend((start..end).map(|i| (*array, i)));
401    }
402
403    let child_arrays: Vec<&dyn Array> = interleaved
404        .arrays
405        .iter()
406        .map(|list| list.values().as_ref())
407        .collect();
408
409    let interleaved_values = interleave(&child_arrays, &child_indices)?;
410
411    let offsets = OffsetBuffer::new(offsets.into());
412    let list_array = GenericListArray::<O>::new(
413        field.clone(),
414        offsets,
415        interleaved_values,
416        interleaved.nulls,
417    );
418
419    Ok(Arc::new(list_array))
420}
421
422/// Specialized [`interleave`] for [`RunArray`].
423fn interleave_run_end<R: RunEndIndexType>(
424    values: &[&dyn Array],
425    indices: &[(usize, usize)],
426) -> Result<ArrayRef, ArrowError> {
427    if indices.is_empty() {
428        return Ok(new_empty_array(values[0].data_type()));
429    }
430
431    let n = indices.len();
432    R::Native::from_usize(n).ok_or_else(|| {
433        ArrowError::ComputeError(format!(
434            "interleave_run_end: output length {n} does not fit run-end type"
435        ))
436    })?;
437
438    let runs: Vec<&RunArray<R>> = values.iter().map(|a| a.as_run::<R>()).collect();
439    let value_arrays: Vec<&dyn Array> = runs.iter().map(|r| r.values().as_ref()).collect();
440
441    // Resolve each (array, logical_row) to (array, physical_row), so we can
442    // lookup physical indices by batch.
443    let mut phys_pairs: Vec<(usize, usize)> = vec![(0, 0); n];
444    let mut grouped: Vec<(Vec<R::Native>, Vec<usize>)> =
445        (0..runs.len()).map(|_| (Vec::new(), Vec::new())).collect();
446    for (out_pos, &(arr, row)) in indices.iter().enumerate() {
447        let row = R::Native::from_usize(row).ok_or_else(|| {
448            ArrowError::InvalidArgumentError(format!(
449                "interleave_run_end: row index {row} not representable as run-end type {}",
450                R::DATA_TYPE
451            ))
452        })?;
453        grouped[arr].0.push(row);
454        grouped[arr].1.push(out_pos);
455    }
456    for (arr_idx, (logical_rows, out_positions)) in grouped.into_iter().enumerate() {
457        let phys = runs[arr_idx].get_physical_indices(&logical_rows)?;
458        for (p, out_pos) in phys.iter().zip(out_positions.iter()) {
459            phys_pairs[*out_pos] = (arr_idx, *p);
460        }
461    }
462
463    // Coalesce by physical-pair equality only: emit a new run when the
464    // (array_idx, physical_idx) pair changes between adjacent output rows.
465    // TODO: We could perform an equality check across sources to extend the
466    // output run, but we can't call make_comparator from this crate.
467    let mut run_ends_buf: Vec<R::Native> = Vec::with_capacity(n);
468    let mut dedup_pairs: Vec<(usize, usize)> = Vec::with_capacity(n);
469    dedup_pairs.push(phys_pairs[0]);
470    for i in 1..n {
471        if phys_pairs[i] != phys_pairs[i - 1] {
472            run_ends_buf.push(R::Native::from_usize(i).unwrap());
473            dedup_pairs.push(phys_pairs[i]);
474        }
475    }
476    run_ends_buf.push(R::Native::from_usize(n).unwrap());
477
478    let taken_values = interleave(&value_arrays, &dedup_pairs)?;
479    let run_ends = PrimitiveArray::<R>::from_iter_values(run_ends_buf);
480
481    Ok(Arc::new(RunArray::<R>::try_new(
482        &run_ends,
483        taken_values.as_ref(),
484    )?))
485}
486
487fn interleave_list_view<O: OffsetSizeTrait>(
488    values: &[&dyn Array],
489    indices: &[(usize, usize)],
490    field: &FieldRef,
491) -> Result<ArrayRef, ArrowError> {
492    let interleaved = Interleave::<'_, GenericListViewArray<O>>::new(values, indices);
493
494    // Pick whichever strategy produces fewer child elements:
495    // - Per-row copy: total = sum of selected sizes. Better for sparse selections.
496    // - Concat + offset adjustment: total = sum of source backing array lengths.
497    //   Better when rows share backing elements via overlapping offset/size ranges.
498    let concat_cost: usize = interleaved.arrays.iter().map(|lv| lv.values().len()).sum();
499    let per_row_cost: usize = indices
500        .iter()
501        .map(|&(a, r)| interleaved.arrays[a].sizes()[r].as_usize())
502        .sum();
503
504    if per_row_cost <= concat_cost {
505        interleave_list_view_copy::<O>(&interleaved, indices, field)
506    } else {
507        interleave_list_view_concat::<O>(&interleaved, indices, field)
508    }
509}
510
511/// Per-row copy: copies each selected row's child elements into a new flat array.
512fn interleave_list_view_copy<O: OffsetSizeTrait>(
513    interleaved: &Interleave<'_, GenericListViewArray<O>>,
514    indices: &[(usize, usize)],
515    field: &FieldRef,
516) -> Result<ArrayRef, ArrowError> {
517    let mut capacity = 0usize;
518    let mut offsets = Vec::with_capacity(indices.len());
519    let mut sizes = Vec::with_capacity(indices.len());
520    for &(array_idx, row_idx) in indices {
521        let list = interleaved.arrays[array_idx];
522        let size = list.sizes()[row_idx].as_usize();
523        offsets.push(
524            O::from_usize(capacity).ok_or_else(|| ArrowError::OffsetOverflowError(capacity))?,
525        );
526        sizes.push(O::from_usize(size).ok_or_else(|| ArrowError::OffsetOverflowError(size))?);
527        capacity += size;
528    }
529
530    let child_data: Vec<_> = interleaved
531        .arrays
532        .iter()
533        .map(|list| list.values().to_data())
534        .collect();
535    let child_data_refs: Vec<_> = child_data.iter().collect();
536    let mut mutable_child = MutableArrayData::new(child_data_refs, false, capacity);
537    for &(array_idx, row_idx) in indices {
538        let list = interleaved.arrays[array_idx];
539        let start = list.offsets()[row_idx].as_usize();
540        let size = list.sizes()[row_idx].as_usize();
541        if size > 0 {
542            mutable_child.extend(array_idx, start, start + size);
543        }
544    }
545
546    Ok(Arc::new(GenericListViewArray::<O>::new(
547        field.clone(),
548        offsets.into(),
549        sizes.into(),
550        make_array(mutable_child.freeze()),
551        interleaved.nulls.clone(),
552    )))
553}
554
555/// Concat backing arrays: concatenates all source value arrays and adjusts offsets.
556/// Preserves within-source element sharing.
557fn interleave_list_view_concat<O: OffsetSizeTrait>(
558    interleaved: &Interleave<'_, GenericListViewArray<O>>,
559    indices: &[(usize, usize)],
560    field: &FieldRef,
561) -> Result<ArrayRef, ArrowError> {
562    let child_arrays: Vec<&dyn Array> = interleaved
563        .arrays
564        .iter()
565        .map(|lv| lv.values().as_ref())
566        .collect();
567    let mut base_offsets = Vec::with_capacity(interleaved.arrays.len());
568    let mut running = 0usize;
569    for lv in &interleaved.arrays {
570        base_offsets.push(running);
571        running += lv.values().len();
572    }
573    let combined_values = concat(&child_arrays)?;
574
575    let mut new_offsets = Vec::with_capacity(indices.len());
576    let mut new_sizes = Vec::with_capacity(indices.len());
577    for &(array_idx, row_idx) in indices {
578        let lv = interleaved.arrays[array_idx];
579        let adjusted = lv.offsets()[row_idx].as_usize() + base_offsets[array_idx];
580        new_offsets.push(
581            O::from_usize(adjusted).ok_or_else(|| ArrowError::OffsetOverflowError(adjusted))?,
582        );
583        new_sizes.push(lv.sizes()[row_idx]);
584    }
585
586    Ok(Arc::new(GenericListViewArray::<O>::new(
587        field.clone(),
588        new_offsets.into(),
589        new_sizes.into(),
590        combined_values,
591        interleaved.nulls.clone(),
592    )))
593}
594
595/// Fallback implementation of interleave using [`MutableArrayData`]
596fn interleave_fallback(
597    values: &[&dyn Array],
598    indices: &[(usize, usize)],
599) -> Result<ArrayRef, ArrowError> {
600    let arrays: Vec<_> = values.iter().map(|x| x.to_data()).collect();
601    let arrays: Vec<_> = arrays.iter().collect();
602    let mut array_data = MutableArrayData::new(arrays, false, indices.len());
603
604    let mut cur_array = indices[0].0;
605    let mut start_row_idx = indices[0].1;
606    let mut end_row_idx = start_row_idx + 1;
607
608    for (array, row) in indices.iter().skip(1).copied() {
609        if array == cur_array && row == end_row_idx {
610            // subsequent row in same batch
611            end_row_idx += 1;
612            continue;
613        }
614
615        // emit current batch of rows for current buffer
616        array_data.extend(cur_array, start_row_idx, end_row_idx);
617
618        // start new batch of rows
619        cur_array = array;
620        start_row_idx = row;
621        end_row_idx = start_row_idx + 1;
622    }
623
624    // emit final batch of rows
625    array_data.extend(cur_array, start_row_idx, end_row_idx);
626    Ok(make_array(array_data.freeze()))
627}
628
629/// Fallback implementation for interleaving dictionaries when it was determined
630/// that the dictionary values should not be merged. This implementation concatenates
631/// the value slices and recomputes the resulting dictionary keys.
632///
633/// # Panics
634///
635/// This function assumes that the combined dictionary values will not overflow the
636/// key type. Callers must verify this condition [`should_merge_dictionary_values`]
637/// before calling this function.
638fn interleave_fallback_dictionary<K: ArrowDictionaryKeyType>(
639    dictionaries: &[&DictionaryArray<K>],
640    indices: &[(usize, usize)],
641) -> Result<ArrayRef, ArrowError> {
642    let relative_offsets: Vec<usize> = dictionaries
643        .iter()
644        .scan(0usize, |offset, dict| {
645            let current = *offset;
646            *offset += dict.values().len();
647            Some(current)
648        })
649        .collect();
650    let all_values: Vec<&dyn Array> = dictionaries.iter().map(|d| d.values().as_ref()).collect();
651    let concatenated_values = concat(&all_values)?;
652
653    let any_nulls = dictionaries.iter().any(|d| d.keys().nulls().is_some());
654    let (new_keys, nulls) = if any_nulls {
655        let mut has_nulls = false;
656        let new_keys: Vec<K::Native> = indices
657            .iter()
658            .map(|(array, row)| {
659                let old_keys = dictionaries[*array].keys();
660                if old_keys.is_valid(*row) {
661                    let old_key = old_keys.values()[*row].as_usize();
662                    K::Native::from_usize(relative_offsets[*array] + old_key)
663                        .expect("key overflow should be checked by caller")
664                } else {
665                    has_nulls = true;
666                    K::Native::ZERO
667                }
668            })
669            .collect();
670
671        let nulls = if has_nulls {
672            let null_buffer = BooleanBuffer::collect_bool(indices.len(), |i| {
673                let (array, row) = indices[i];
674                dictionaries[array].keys().is_valid(row)
675            });
676            Some(NullBuffer::new(null_buffer))
677        } else {
678            None
679        };
680        (new_keys, nulls)
681    } else {
682        let new_keys: Vec<K::Native> = indices
683            .iter()
684            .map(|(array, row)| {
685                let old_key = dictionaries[*array].keys().values()[*row].as_usize();
686                K::Native::from_usize(relative_offsets[*array] + old_key)
687                    .expect("key overflow should be checked by caller")
688            })
689            .collect();
690        (new_keys, None)
691    };
692
693    let keys_array = PrimitiveArray::<K>::new(new_keys.into(), nulls);
694    // SAFETY: keys_array is constructed from a valid set of keys.
695    let array = unsafe { DictionaryArray::new_unchecked(keys_array, concatenated_values) };
696    Ok(Arc::new(array))
697}
698
699/// Interleave rows by index from multiple [`RecordBatch`] instances and return a new [`RecordBatch`].
700///
701/// This function will call [`interleave`] on each array of the [`RecordBatch`] instances and assemble a new [`RecordBatch`].
702///
703/// # Example
704/// ```
705/// # use std::sync::Arc;
706/// # use arrow_array::{StringArray, Int32Array, RecordBatch, UInt32Array};
707/// # use arrow_schema::{DataType, Field, Schema};
708/// # use arrow_select::interleave::interleave_record_batch;
709///
710/// let schema = Arc::new(Schema::new(vec![
711///     Field::new("a", DataType::Int32, true),
712///     Field::new("b", DataType::Utf8, true),
713/// ]));
714///
715/// let batch1 = RecordBatch::try_new(
716///     schema.clone(),
717///     vec![
718///         Arc::new(Int32Array::from(vec![0, 1, 2])),
719///         Arc::new(StringArray::from(vec!["a", "b", "c"])),
720///     ],
721/// ).unwrap();
722///
723/// let batch2 = RecordBatch::try_new(
724///     schema.clone(),
725///     vec![
726///         Arc::new(Int32Array::from(vec![3, 4, 5])),
727///         Arc::new(StringArray::from(vec!["d", "e", "f"])),
728///     ],
729/// ).unwrap();
730///
731/// let indices = vec![(0, 1), (1, 2), (0, 0), (1, 1)];
732/// let interleaved = interleave_record_batch(&[&batch1, &batch2], &indices).unwrap();
733///
734/// let expected = RecordBatch::try_new(
735///     schema,
736///     vec![
737///         Arc::new(Int32Array::from(vec![1, 5, 0, 4])),
738///         Arc::new(StringArray::from(vec!["b", "f", "a", "e"])),
739///     ],
740/// ).unwrap();
741/// assert_eq!(interleaved, expected);
742/// ```
743pub fn interleave_record_batch(
744    record_batches: &[&RecordBatch],
745    indices: &[(usize, usize)],
746) -> Result<RecordBatch, ArrowError> {
747    let schema = record_batches[0].schema();
748    let columns = (0..schema.fields().len())
749        .map(|i| {
750            let column_values: Vec<&dyn Array> = record_batches
751                .iter()
752                .map(|batch| batch.column(i).as_ref())
753                .collect();
754            interleave(&column_values, indices)
755        })
756        .collect::<Result<Vec<_>, _>>()?;
757    RecordBatch::try_new(schema, columns)
758}
759
760#[cfg(test)]
761mod tests {
762    use super::*;
763    use arrow_array::Int32RunArray;
764    use arrow_array::builder::{GenericListBuilder, Int32Builder, PrimitiveRunBuilder};
765    use arrow_array::types::Int8Type;
766    use arrow_buffer::ScalarBuffer;
767    use arrow_schema::Field;
768
769    #[test]
770    fn test_primitive() {
771        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
772        let b = Int32Array::from_iter_values([5, 6, 7]);
773        let c = Int32Array::from_iter_values([8, 9, 10]);
774        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
775        let v = values.as_primitive::<Int32Type>();
776        assert_eq!(v.values(), &[4, 4, 10, 8, 6]);
777    }
778
779    #[test]
780    fn test_primitive_nulls() {
781        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
782        let b = Int32Array::from_iter([Some(1), Some(4), None]);
783        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (0, 2)]).unwrap();
784        let v: Vec<_> = values.as_primitive::<Int32Type>().into_iter().collect();
785        assert_eq!(&v, &[Some(2), None, None, Some(4), Some(3)])
786    }
787
788    #[test]
789    fn test_primitive_empty() {
790        let a = Int32Array::from_iter_values([1, 2, 3, 4]);
791        let v = interleave(&[&a], &[]).unwrap();
792        assert!(v.is_empty());
793        assert_eq!(v.data_type(), &DataType::Int32);
794    }
795
796    #[test]
797    fn test_strings() {
798        let a = StringArray::from_iter_values(["a", "b", "c"]);
799        let b = StringArray::from_iter_values(["hello", "world", "foo"]);
800        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
801        let v = values.as_string::<i32>();
802        let values: Vec<_> = v.into_iter().collect();
803        assert_eq!(
804            &values,
805            &[
806                Some("c"),
807                Some("c"),
808                Some("hello"),
809                Some("world"),
810                Some("b")
811            ]
812        )
813    }
814
815    #[test]
816    fn test_interleave_dictionary() {
817        let a = DictionaryArray::<Int32Type>::from_iter(["a", "b", "c", "a", "b"]);
818        let b = DictionaryArray::<Int32Type>::from_iter(["a", "c", "a", "c", "a"]);
819
820        // Should not recompute dictionary
821        let values =
822            interleave(&[&a, &b], &[(0, 2), (0, 2), (0, 2), (1, 0), (1, 1), (0, 1)]).unwrap();
823        let v = values.as_dictionary::<Int32Type>();
824        assert_eq!(v.values().len(), 5);
825
826        let vc = v.downcast_dict::<StringArray>().unwrap();
827        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
828        assert_eq!(&collected, &["c", "c", "c", "a", "c", "b"]);
829
830        // Should recompute dictionary
831        let values = interleave(&[&a, &b], &[(0, 2), (0, 2), (1, 1)]).unwrap();
832        let v = values.as_dictionary::<Int32Type>();
833        assert_eq!(v.values().len(), 1);
834
835        let vc = v.downcast_dict::<StringArray>().unwrap();
836        let collected: Vec<_> = vc.into_iter().map(Option::unwrap).collect();
837        assert_eq!(&collected, &["c", "c", "c"]);
838    }
839
840    #[test]
841    fn test_interleave_dictionary_nulls() {
842        let input_1_keys = Int32Array::from_iter_values([0, 2, 1, 3]);
843        let input_1_values = StringArray::from(vec![Some("foo"), None, Some("bar"), Some("fiz")]);
844        let input_1 = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
845        let input_2: DictionaryArray<Int32Type> = vec![None].into_iter().collect();
846
847        let expected = vec![Some("fiz"), None, None, Some("foo")];
848
849        let values = interleave(
850            &[&input_1 as _, &input_2 as _],
851            &[(0, 3), (0, 2), (1, 0), (0, 0)],
852        )
853        .unwrap();
854        let dictionary = values.as_dictionary::<Int32Type>();
855        let actual: Vec<Option<&str>> = dictionary
856            .downcast_dict::<StringArray>()
857            .unwrap()
858            .into_iter()
859            .collect();
860
861        assert_eq!(actual, expected);
862    }
863
864    #[test]
865    fn test_interleave_dictionary_overflow_same_values() {
866        let values: ArrayRef = Arc::new(StringArray::from_iter_values(
867            (0..50).map(|i| format!("v{i}")),
868        ));
869
870        // With 3 dictionaries of 50 values each, relative_offsets = [0, 50, 100]
871        // Accessing key 49 from dict3 gives 100 + 49 = 149 which overflows Int8
872        // (max 127).
873        // This test case falls back to interleave_fallback because the
874        // dictionaries share the same underlying values slice.
875        let dict1 = DictionaryArray::<Int8Type>::new(
876            Int8Array::from_iter_values([0, 1, 2]),
877            values.clone(),
878        );
879        let dict2 = DictionaryArray::<Int8Type>::new(
880            Int8Array::from_iter_values([0, 1, 2]),
881            values.clone(),
882        );
883        let dict3 =
884            DictionaryArray::<Int8Type>::new(Int8Array::from_iter_values([49]), values.clone());
885
886        let indices = &[(0, 0), (1, 0), (2, 0)];
887        let result = interleave(&[&dict1, &dict2, &dict3], indices).unwrap();
888
889        let dict_result = result.as_dictionary::<Int8Type>();
890        let string_result: Vec<_> = dict_result
891            .downcast_dict::<StringArray>()
892            .unwrap()
893            .into_iter()
894            .map(|x| x.unwrap())
895            .collect();
896        assert_eq!(string_result, vec!["v0", "v0", "v49"]);
897    }
898
899    fn test_interleave_lists<O: OffsetSizeTrait>() {
900        // [[1, 2], null, [3]]
901        let mut a = GenericListBuilder::<O, _>::new(Int32Builder::new());
902        a.values().append_value(1);
903        a.values().append_value(2);
904        a.append(true);
905        a.append(false);
906        a.values().append_value(3);
907        a.append(true);
908        let a = a.finish();
909
910        // [[4], null, [5, 6, null]]
911        let mut b = GenericListBuilder::<O, _>::new(Int32Builder::new());
912        b.values().append_value(4);
913        b.append(true);
914        b.append(false);
915        b.values().append_value(5);
916        b.values().append_value(6);
917        b.values().append_null();
918        b.append(true);
919        let b = b.finish();
920
921        let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
922        let v = values
923            .as_any()
924            .downcast_ref::<GenericListArray<O>>()
925            .unwrap();
926
927        // [[3], null, [4], [5, 6, null], null]
928        let mut expected = GenericListBuilder::<O, _>::new(Int32Builder::new());
929        expected.values().append_value(3);
930        expected.append(true);
931        expected.append(false);
932        expected.values().append_value(4);
933        expected.append(true);
934        expected.values().append_value(5);
935        expected.values().append_value(6);
936        expected.values().append_null();
937        expected.append(true);
938        expected.append(false);
939        let expected = expected.finish();
940
941        assert_eq!(v, &expected);
942    }
943
944    #[test]
945    fn test_lists() {
946        test_interleave_lists::<i32>();
947    }
948
949    #[test]
950    fn test_large_lists() {
951        test_interleave_lists::<i64>();
952    }
953
954    fn test_interleave_list_views<O: OffsetSizeTrait>() {
955        // [[1, 2], null, [3]]
956        let mut a = GenericListBuilder::<O, _>::new(Int32Builder::new());
957        a.values().append_value(1);
958        a.values().append_value(2);
959        a.append(true);
960        a.append(false);
961        a.values().append_value(3);
962        a.append(true);
963        let a: GenericListViewArray<O> = a.finish().into();
964
965        // [[4], null, [5, 6, null]]
966        let mut b = GenericListBuilder::<O, _>::new(Int32Builder::new());
967        b.values().append_value(4);
968        b.append(true);
969        b.append(false);
970        b.values().append_value(5);
971        b.values().append_value(6);
972        b.values().append_null();
973        b.append(true);
974        let b: GenericListViewArray<O> = b.finish().into();
975
976        let values = interleave(&[&a, &b], &[(0, 2), (0, 1), (1, 0), (1, 2), (1, 1)]).unwrap();
977        let v = values
978            .as_any()
979            .downcast_ref::<GenericListViewArray<O>>()
980            .unwrap();
981
982        // [[3], null, [4], [5, 6, null], null]
983        let mut expected = GenericListBuilder::<O, _>::new(Int32Builder::new());
984        expected.values().append_value(3);
985        expected.append(true);
986        expected.append(false);
987        expected.values().append_value(4);
988        expected.append(true);
989        expected.values().append_value(5);
990        expected.values().append_value(6);
991        expected.values().append_null();
992        expected.append(true);
993        expected.append(false);
994        let expected: GenericListViewArray<O> = expected.finish().into();
995
996        assert_eq!(v, &expected);
997    }
998
999    #[test]
1000    fn test_list_views() {
1001        test_interleave_list_views::<i32>();
1002    }
1003
1004    #[test]
1005    fn test_large_list_views() {
1006        test_interleave_list_views::<i64>();
1007    }
1008
1009    #[test]
1010    fn test_interleave_list_view_overlapping() {
1011        let field = Arc::new(Field::new_list_field(DataType::Int64, false));
1012
1013        // lv_a: 10 rows, two groups of 5 sharing the same backing elements.
1014        //   rows 0-4 → offset 0, size 5 → [0,1,2,3,4]
1015        //   rows 5-9 → offset 5, size 5 → [5,6,7,8,9]
1016        let lv_a = ListViewArray::new(
1017            Arc::clone(&field),
1018            ScalarBuffer::from(vec![0i32, 0, 0, 0, 0, 5, 5, 5, 5, 5]),
1019            ScalarBuffer::from(vec![5i32; 10]),
1020            Arc::new(Int64Array::from_iter_values(0..10)),
1021            None,
1022        );
1023
1024        // lv_b: 8 rows, two groups of 4 sharing the same backing elements.
1025        //   rows 0-3 → offset 0, size 3 → [100,101,102]
1026        //   rows 4-7 → offset 3, size 3 → [103,104,105]
1027        let lv_b = ListViewArray::new(
1028            Arc::clone(&field),
1029            ScalarBuffer::from(vec![0i32, 0, 0, 0, 3, 3, 3, 3]),
1030            ScalarBuffer::from(vec![3i32; 8]),
1031            Arc::new(Int64Array::from_iter_values(100..106)),
1032            None,
1033        );
1034
1035        let indices: Vec<(usize, usize)> = vec![
1036            (0, 0),
1037            (1, 0),
1038            (0, 5),
1039            (1, 4),
1040            (0, 1),
1041            (1, 1),
1042            (0, 6),
1043            (1, 5),
1044        ];
1045        let result = interleave(&[&lv_a as &dyn Array, &lv_b as &dyn Array], &indices).unwrap();
1046        result
1047            .to_data()
1048            .validate_full()
1049            .expect("result must be valid");
1050
1051        let result_lv = result.as_list_view::<i32>();
1052        assert_eq!(result_lv.len(), 8);
1053        assert_eq!(
1054            result_lv.value(0).as_primitive::<Int64Type>().values(),
1055            &[0, 1, 2, 3, 4]
1056        );
1057        assert_eq!(
1058            result_lv.value(1).as_primitive::<Int64Type>().values(),
1059            &[100, 101, 102]
1060        );
1061        assert_eq!(
1062            result_lv.value(2).as_primitive::<Int64Type>().values(),
1063            &[5, 6, 7, 8, 9]
1064        );
1065        assert_eq!(
1066            result_lv.value(3).as_primitive::<Int64Type>().values(),
1067            &[103, 104, 105]
1068        );
1069
1070        // Backing elements = sum of source arrays (10 + 6 = 16), not per-row
1071        // expansion (8 rows × avg ~4 = 32). Overlapping sharing is preserved.
1072        let total_input_elements = lv_a.values().len() + lv_b.values().len();
1073        assert_eq!(result_lv.values().len(), total_input_elements);
1074    }
1075
1076    #[test]
1077    fn test_struct_without_nulls() {
1078        let fields = Fields::from(vec![
1079            Field::new("number_col", DataType::Int32, false),
1080            Field::new("string_col", DataType::Utf8, false),
1081        ]);
1082        let a = {
1083            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
1084            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
1085
1086            StructArray::try_new(
1087                fields.clone(),
1088                vec![Arc::new(number_col), Arc::new(string_col)],
1089                None,
1090            )
1091            .unwrap()
1092        };
1093
1094        let b = {
1095            let number_col = Int32Array::from_iter_values([5, 6, 7]);
1096            let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
1097
1098            StructArray::try_new(
1099                fields.clone(),
1100                vec![Arc::new(number_col), Arc::new(string_col)],
1101                None,
1102            )
1103            .unwrap()
1104        };
1105
1106        let c = {
1107            let number_col = Int32Array::from_iter_values([8, 9, 10]);
1108            let string_col = StringArray::from_iter_values(["x", "y", "z"]);
1109
1110            StructArray::try_new(
1111                fields.clone(),
1112                vec![Arc::new(number_col), Arc::new(string_col)],
1113                None,
1114            )
1115            .unwrap()
1116        };
1117
1118        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (2, 0), (1, 1)]).unwrap();
1119        let values_struct = values.as_struct();
1120        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
1121        assert_eq!(values_struct.null_count(), 0);
1122
1123        let values_number = values_struct.column(0).as_primitive::<Int32Type>();
1124        assert_eq!(values_number.values(), &[4, 4, 10, 8, 6]);
1125        let values_string = values_struct.column(1).as_string::<i32>();
1126        let values_string: Vec<_> = values_string.into_iter().collect();
1127        assert_eq!(
1128            &values_string,
1129            &[Some("d"), Some("d"), Some("z"), Some("x"), Some("world")]
1130        );
1131    }
1132
1133    #[test]
1134    fn test_struct_with_nulls_in_values() {
1135        let fields = Fields::from(vec![
1136            Field::new("number_col", DataType::Int32, true),
1137            Field::new("string_col", DataType::Utf8, true),
1138        ]);
1139        let a = {
1140            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
1141            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
1142
1143            StructArray::try_new(
1144                fields.clone(),
1145                vec![Arc::new(number_col), Arc::new(string_col)],
1146                None,
1147            )
1148            .unwrap()
1149        };
1150
1151        let b = {
1152            let number_col = Int32Array::from_iter([Some(1), Some(4), None]);
1153            let string_col = StringArray::from(vec![Some("hello"), None, Some("foo")]);
1154
1155            StructArray::try_new(
1156                fields.clone(),
1157                vec![Arc::new(number_col), Arc::new(string_col)],
1158                None,
1159            )
1160            .unwrap()
1161        };
1162
1163        let values = interleave(&[&a, &b], &[(0, 1), (1, 2), (1, 2), (0, 3), (1, 1)]).unwrap();
1164        let values_struct = values.as_struct();
1165        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
1166
1167        // The struct itself has no nulls, but the values do
1168        assert_eq!(values_struct.null_count(), 0);
1169
1170        let values_number: Vec<_> = values_struct
1171            .column(0)
1172            .as_primitive::<Int32Type>()
1173            .into_iter()
1174            .collect();
1175        assert_eq!(values_number, &[Some(2), None, None, Some(4), Some(4)]);
1176
1177        let values_string = values_struct.column(1).as_string::<i32>();
1178        let values_string: Vec<_> = values_string.into_iter().collect();
1179        assert_eq!(
1180            &values_string,
1181            &[Some("b"), Some("foo"), Some("foo"), Some("d"), None]
1182        );
1183    }
1184
1185    #[test]
1186    fn test_struct_with_nulls() {
1187        let fields = Fields::from(vec![
1188            Field::new("number_col", DataType::Int32, false),
1189            Field::new("string_col", DataType::Utf8, false),
1190        ]);
1191        let a = {
1192            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
1193            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
1194
1195            StructArray::try_new(
1196                fields.clone(),
1197                vec![Arc::new(number_col), Arc::new(string_col)],
1198                None,
1199            )
1200            .unwrap()
1201        };
1202
1203        let b = {
1204            let number_col = Int32Array::from_iter_values([5, 6, 7]);
1205            let string_col = StringArray::from_iter_values(["hello", "world", "foo"]);
1206
1207            StructArray::try_new(
1208                fields.clone(),
1209                vec![Arc::new(number_col), Arc::new(string_col)],
1210                Some(NullBuffer::from(&[true, false, true])),
1211            )
1212            .unwrap()
1213        };
1214
1215        let c = {
1216            let number_col = Int32Array::from_iter_values([8, 9, 10]);
1217            let string_col = StringArray::from_iter_values(["x", "y", "z"]);
1218
1219            StructArray::try_new(
1220                fields.clone(),
1221                vec![Arc::new(number_col), Arc::new(string_col)],
1222                None,
1223            )
1224            .unwrap()
1225        };
1226
1227        let values = interleave(&[&a, &b, &c], &[(0, 3), (0, 3), (2, 2), (1, 1), (2, 0)]).unwrap();
1228        let values_struct = values.as_struct();
1229        assert_eq!(values_struct.data_type(), &DataType::Struct(fields));
1230
1231        let validity: Vec<bool> = {
1232            let null_buffer = values_struct.nulls().expect("should_have_nulls");
1233
1234            null_buffer.iter().collect()
1235        };
1236        assert_eq!(validity, &[true, true, true, false, true]);
1237        let values_number = values_struct.column(0).as_primitive::<Int32Type>();
1238        assert_eq!(values_number.values(), &[4, 4, 10, 6, 8]);
1239        let values_string = values_struct.column(1).as_string::<i32>();
1240        let values_string: Vec<_> = values_string.into_iter().collect();
1241        assert_eq!(
1242            &values_string,
1243            &[Some("d"), Some("d"), Some("z"), Some("world"), Some("x"),]
1244        );
1245    }
1246
1247    #[test]
1248    fn test_struct_empty() {
1249        let fields = Fields::from(vec![
1250            Field::new("number_col", DataType::Int32, false),
1251            Field::new("string_col", DataType::Utf8, false),
1252        ]);
1253        let a = {
1254            let number_col = Int32Array::from_iter_values([1, 2, 3, 4]);
1255            let string_col = StringArray::from_iter_values(["a", "b", "c", "d"]);
1256
1257            StructArray::try_new(
1258                fields.clone(),
1259                vec![Arc::new(number_col), Arc::new(string_col)],
1260                None,
1261            )
1262            .unwrap()
1263        };
1264        let v = interleave(&[&a], &[]).unwrap();
1265        assert!(v.is_empty());
1266        assert_eq!(v.data_type(), &DataType::Struct(fields));
1267    }
1268
1269    #[test]
1270    fn interleave_sparse_nulls() {
1271        let values = StringArray::from_iter_values((0..100).map(|x| x.to_string()));
1272        let keys = Int32Array::from_iter_values(0..10);
1273        let dict_a = DictionaryArray::new(keys, Arc::new(values));
1274        let values = StringArray::new_null(0);
1275        let keys = Int32Array::new_null(10);
1276        let dict_b = DictionaryArray::new(keys, Arc::new(values));
1277
1278        let indices = &[(0, 0), (0, 1), (0, 2), (1, 0)];
1279        let array = interleave(&[&dict_a, &dict_b], indices).unwrap();
1280
1281        let expected =
1282            DictionaryArray::<Int32Type>::from_iter(vec![Some("0"), Some("1"), Some("2"), None]);
1283        assert_eq!(array.as_ref(), &expected)
1284    }
1285
1286    #[test]
1287    fn test_interleave_views() {
1288        let values = StringArray::from_iter_values([
1289            "hello",
1290            "world_long_string_not_inlined",
1291            "foo",
1292            "bar",
1293            "baz",
1294        ]);
1295        let view_a = StringViewArray::from(&values);
1296
1297        let values = StringArray::from_iter_values([
1298            "test",
1299            "data",
1300            "more_long_string_not_inlined",
1301            "views",
1302            "here",
1303        ]);
1304        let view_b = StringViewArray::from(&values);
1305
1306        let indices = &[
1307            (0, 2), // "foo"
1308            (1, 0), // "test"
1309            (0, 4), // "baz"
1310            (1, 3), // "views"
1311            (0, 1), // "world_long_string_not_inlined"
1312        ];
1313
1314        // Test specialized implementation
1315        let values = interleave(&[&view_a, &view_b], indices).unwrap();
1316        let result = values.as_string_view();
1317        assert_eq!(result.data_buffers().len(), 1);
1318
1319        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
1320        let fallback_result = fallback.as_string_view();
1321        // note that fallback_result has 2 buffers, but only one long enough string to warrant a buffer
1322        assert_eq!(fallback_result.data_buffers().len(), 2);
1323
1324        // Convert to strings for easier assertion
1325        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1326
1327        let fallback_collected: Vec<_> = fallback_result
1328            .iter()
1329            .map(|x| x.map(|s| s.to_string()))
1330            .collect();
1331
1332        assert_eq!(&collected, &fallback_collected);
1333
1334        assert_eq!(
1335            &collected,
1336            &[
1337                Some("foo".to_string()),
1338                Some("test".to_string()),
1339                Some("baz".to_string()),
1340                Some("views".to_string()),
1341                Some("world_long_string_not_inlined".to_string()),
1342            ]
1343        );
1344    }
1345
1346    #[test]
1347    fn test_interleave_views_with_nulls() {
1348        let values = StringArray::from_iter([
1349            Some("hello"),
1350            None,
1351            Some("foo_long_string_not_inlined"),
1352            Some("bar"),
1353            None,
1354        ]);
1355        let view_a = StringViewArray::from(&values);
1356
1357        let values = StringArray::from_iter([
1358            Some("test"),
1359            Some("data_long_string_not_inlined"),
1360            None,
1361            None,
1362            Some("here"),
1363        ]);
1364        let view_b = StringViewArray::from(&values);
1365
1366        let indices = &[
1367            (0, 1), // null
1368            (1, 2), // null
1369            (0, 2), // "foo_long_string_not_inlined"
1370            (1, 3), // null
1371            (0, 4), // null
1372        ];
1373
1374        // Test specialized implementation
1375        let values = interleave(&[&view_a, &view_b], indices).unwrap();
1376        let result = values.as_string_view();
1377        assert_eq!(result.data_buffers().len(), 1);
1378
1379        let fallback = interleave_fallback(&[&view_a, &view_b], indices).unwrap();
1380        let fallback_result = fallback.as_string_view();
1381
1382        // Convert to strings for easier assertion
1383        let collected: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1384
1385        let fallback_collected: Vec<_> = fallback_result
1386            .iter()
1387            .map(|x| x.map(|s| s.to_string()))
1388            .collect();
1389
1390        assert_eq!(&collected, &fallback_collected);
1391
1392        assert_eq!(
1393            &collected,
1394            &[
1395                None,
1396                None,
1397                Some("foo_long_string_not_inlined".to_string()),
1398                None,
1399                None,
1400            ]
1401        );
1402    }
1403
1404    #[test]
1405    fn test_interleave_views_multiple_buffers() {
1406        let str1 = "very_long_string_from_first_buffer".as_bytes();
1407        let str2 = "very_long_string_from_second_buffer".as_bytes();
1408        let buffer1 = str1.to_vec().into();
1409        let buffer2 = str2.to_vec().into();
1410
1411        let view1 = ByteView::new(str1.len() as u32, &str1[..4])
1412            .with_buffer_index(0)
1413            .with_offset(0)
1414            .as_u128();
1415        let view2 = ByteView::new(str2.len() as u32, &str2[..4])
1416            .with_buffer_index(1)
1417            .with_offset(0)
1418            .as_u128();
1419        let view_a =
1420            StringViewArray::try_new(vec![view1, view2].into(), vec![buffer1, buffer2], None)
1421                .unwrap();
1422
1423        let str3 = "another_very_long_string_buffer_three".as_bytes();
1424        let str4 = "different_long_string_in_buffer_four".as_bytes();
1425        let buffer3 = str3.to_vec().into();
1426        let buffer4 = str4.to_vec().into();
1427
1428        let view3 = ByteView::new(str3.len() as u32, &str3[..4])
1429            .with_buffer_index(0)
1430            .with_offset(0)
1431            .as_u128();
1432        let view4 = ByteView::new(str4.len() as u32, &str4[..4])
1433            .with_buffer_index(1)
1434            .with_offset(0)
1435            .as_u128();
1436        let view_b =
1437            StringViewArray::try_new(vec![view3, view4].into(), vec![buffer3, buffer4], None)
1438                .unwrap();
1439
1440        let indices = &[
1441            (0, 0), // String from first buffer of array A
1442            (1, 0), // String from first buffer of array B
1443            (0, 1), // String from second buffer of array A
1444            (1, 1), // String from second buffer of array B
1445            (0, 0), // String from first buffer of array A again
1446            (1, 1), // String from second buffer of array B again
1447        ];
1448
1449        // Test interleave
1450        let values = interleave(&[&view_a, &view_b], indices).unwrap();
1451        let result = values.as_string_view();
1452
1453        assert_eq!(
1454            result.data_buffers().len(),
1455            4,
1456            "Expected four buffers (two from each input array)"
1457        );
1458
1459        let result_strings: Vec<_> = result.iter().map(|x| x.map(|s| s.to_string())).collect();
1460        assert_eq!(
1461            result_strings,
1462            vec![
1463                Some("very_long_string_from_first_buffer".to_string()),
1464                Some("another_very_long_string_buffer_three".to_string()),
1465                Some("very_long_string_from_second_buffer".to_string()),
1466                Some("different_long_string_in_buffer_four".to_string()),
1467                Some("very_long_string_from_first_buffer".to_string()),
1468                Some("different_long_string_in_buffer_four".to_string()),
1469            ]
1470        );
1471
1472        let views = result.views();
1473        let buffer_indices: Vec<_> = views
1474            .iter()
1475            .map(|raw_view| ByteView::from(*raw_view).buffer_index)
1476            .collect();
1477
1478        assert_eq!(
1479            buffer_indices,
1480            vec![
1481                0, // First buffer from array A
1482                1, // First buffer from array B
1483                2, // Second buffer from array A
1484                3, // Second buffer from array B
1485                0, // First buffer from array A (reused)
1486                3, // Second buffer from array B (reused)
1487            ]
1488        );
1489    }
1490
1491    #[test]
1492    fn test_interleave_run_end_encoded_primitive() {
1493        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1494        builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
1495        let a = builder.finish();
1496
1497        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1498        builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
1499        let b = builder.finish();
1500
1501        let indices = &[(0, 1), (1, 0), (0, 4), (1, 2), (0, 5)];
1502        let result = interleave(&[&a, &b], indices).unwrap();
1503
1504        // The result should be a RunEndEncoded array
1505        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1506
1507        // Cast to RunArray to access values
1508        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1509
1510        // Verify the logical values by accessing the logical array directly
1511        let expected = vec![1, 4, 2, 5, 3];
1512        let mut actual = Vec::new();
1513        for i in 0..result_run_array.len() {
1514            let physical_idx = result_run_array.get_physical_index(i);
1515            let value = result_run_array
1516                .values()
1517                .as_primitive::<Int32Type>()
1518                .value(physical_idx);
1519            actual.push(value);
1520        }
1521        assert_eq!(actual, expected);
1522    }
1523
1524    #[test]
1525    fn test_interleave_run_end_encoded_sliced() {
1526        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1527        builder.extend([1, 1, 2, 2, 2, 3].into_iter().map(Some));
1528        let a = builder.finish();
1529        let a = a.slice(2, 3); // [2, 2, 2]
1530
1531        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1532        builder.extend([4, 5, 5, 6, 6, 6].into_iter().map(Some));
1533        let b = builder.finish();
1534        let b = b.slice(1, 3); // [5, 5, 6]
1535
1536        let indices = &[(0, 1), (1, 0), (0, 2), (1, 1), (1, 2)];
1537        let result = interleave(&[&a, &b], indices).unwrap();
1538
1539        let result = result.as_run::<Int32Type>();
1540        let result = result.downcast::<Int32Array>().unwrap();
1541
1542        let expected = vec![2, 5, 2, 5, 6];
1543        let actual = result.into_iter().flatten().collect::<Vec<_>>();
1544        assert_eq!(actual, expected);
1545    }
1546
1547    #[test]
1548    fn test_interleave_run_end_encoded_string() {
1549        let a: Int32RunArray = vec!["hello", "hello", "world", "world", "foo"]
1550            .into_iter()
1551            .collect();
1552        let b: Int32RunArray = vec!["bar", "baz", "baz", "qux"].into_iter().collect();
1553
1554        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3), (0, 4)];
1555        let result = interleave(&[&a, &b], indices).unwrap();
1556
1557        // The result should be a RunEndEncoded array
1558        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1559
1560        // Cast to RunArray to access values
1561        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1562
1563        // Verify the logical values by accessing the logical array directly
1564        let expected = vec!["hello", "baz", "world", "qux", "foo"];
1565        let mut actual = Vec::new();
1566        for i in 0..result_run_array.len() {
1567            let physical_idx = result_run_array.get_physical_index(i);
1568            let value = result_run_array
1569                .values()
1570                .as_string::<i32>()
1571                .value(physical_idx);
1572            actual.push(value);
1573        }
1574        assert_eq!(actual, expected);
1575    }
1576
1577    #[test]
1578    fn test_interleave_run_end_encoded_with_nulls() {
1579        let a: Int32RunArray = vec![Some("a"), Some("a"), None, None, Some("b")]
1580            .into_iter()
1581            .collect();
1582        let b: Int32RunArray = vec![None, Some("c"), Some("c"), Some("d")]
1583            .into_iter()
1584            .collect();
1585
1586        let indices = &[(0, 1), (1, 0), (0, 2), (1, 3), (0, 4)];
1587        let result = interleave(&[&a, &b], indices).unwrap();
1588
1589        // The result should be a RunEndEncoded array
1590        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1591
1592        // Cast to RunArray to access values
1593        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1594
1595        // Verify the logical values by accessing the logical array directly
1596        let expected = vec![Some("a"), None, None, Some("d"), Some("b")];
1597        let mut actual = Vec::new();
1598        for i in 0..result_run_array.len() {
1599            let physical_idx = result_run_array.get_physical_index(i);
1600            if result_run_array.values().is_null(physical_idx) {
1601                actual.push(None);
1602            } else {
1603                let value = result_run_array
1604                    .values()
1605                    .as_string::<i32>()
1606                    .value(physical_idx);
1607                actual.push(Some(value));
1608            }
1609        }
1610        assert_eq!(actual, expected);
1611    }
1612
1613    #[test]
1614    fn test_interleave_run_end_encoded_different_run_types() {
1615        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1616        builder.extend([1, 1, 2, 3, 3].into_iter().map(Some));
1617        let a = builder.finish();
1618
1619        let mut builder = PrimitiveRunBuilder::<Int16Type, Int32Type>::new();
1620        builder.extend([4, 5, 5, 6].into_iter().map(Some));
1621        let b = builder.finish();
1622
1623        let indices = &[(0, 0), (1, 1), (0, 3), (1, 3)];
1624        let result = interleave(&[&a, &b], indices).unwrap();
1625
1626        // The result should be a RunEndEncoded array
1627        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1628
1629        // Cast to RunArray to access values
1630        let result_run_array: &RunArray<Int16Type> = result.as_any().downcast_ref().unwrap();
1631
1632        // Verify the logical values by accessing the logical array directly
1633        let expected = vec![1, 5, 3, 6];
1634        let mut actual = Vec::new();
1635        for i in 0..result_run_array.len() {
1636            let physical_idx = result_run_array.get_physical_index(i);
1637            let value = result_run_array
1638                .values()
1639                .as_primitive::<Int32Type>()
1640                .value(physical_idx);
1641            actual.push(value);
1642        }
1643        assert_eq!(actual, expected);
1644    }
1645
1646    #[test]
1647    fn test_interleave_run_end_encoded_mixed_run_lengths() {
1648        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1649        builder.extend([1, 2, 2, 2, 2, 3, 3, 4].into_iter().map(Some));
1650        let a = builder.finish();
1651
1652        let mut builder = PrimitiveRunBuilder::<Int64Type, Int32Type>::new();
1653        builder.extend([5, 5, 5, 6, 7, 7, 8, 8].into_iter().map(Some));
1654        let b = builder.finish();
1655
1656        let indices = &[
1657            (0, 0), // 1
1658            (1, 2), // 5
1659            (0, 3), // 2
1660            (1, 3), // 6
1661            (0, 6), // 3
1662            (1, 6), // 8
1663            (0, 7), // 4
1664            (1, 4), // 7
1665        ];
1666        let result = interleave(&[&a, &b], indices).unwrap();
1667
1668        // The result should be a RunEndEncoded array
1669        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1670
1671        // Cast to RunArray to access values
1672        let result_run_array: &RunArray<Int64Type> = result.as_any().downcast_ref().unwrap();
1673
1674        // Verify the logical values by accessing the logical array directly
1675        let expected = vec![1, 5, 2, 6, 3, 8, 4, 7];
1676        let mut actual = Vec::new();
1677        for i in 0..result_run_array.len() {
1678            let physical_idx = result_run_array.get_physical_index(i);
1679            let value = result_run_array
1680                .values()
1681                .as_primitive::<Int32Type>()
1682                .value(physical_idx);
1683            actual.push(value);
1684        }
1685        assert_eq!(actual, expected);
1686    }
1687
1688    #[test]
1689    fn test_interleave_run_end_encoded_empty_runs() {
1690        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1691        builder.extend([1].into_iter().map(Some));
1692        let a = builder.finish();
1693
1694        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
1695        builder.extend([2, 2, 2].into_iter().map(Some));
1696        let b = builder.finish();
1697
1698        let indices = &[(0, 0), (1, 1), (1, 2)];
1699        let result = interleave(&[&a, &b], indices).unwrap();
1700
1701        // The result should be a RunEndEncoded array
1702        assert!(matches!(result.data_type(), DataType::RunEndEncoded(_, _)));
1703
1704        // Cast to RunArray to access values
1705        let result_run_array: &Int32RunArray = result.as_any().downcast_ref().unwrap();
1706
1707        // Verify the logical values by accessing the logical array directly
1708        let expected = vec![1, 2, 2];
1709        let mut actual = Vec::new();
1710        for i in 0..result_run_array.len() {
1711            let physical_idx = result_run_array.get_physical_index(i);
1712            let value = result_run_array
1713                .values()
1714                .as_primitive::<Int32Type>()
1715                .value(physical_idx);
1716            actual.push(value);
1717        }
1718        assert_eq!(actual, expected);
1719    }
1720
1721    #[test]
1722    fn test_struct_no_fields() {
1723        let fields = Fields::empty();
1724        let a = StructArray::try_new_with_length(fields.clone(), vec![], None, 10).unwrap();
1725        let v = interleave(&[&a], &[(0, 0)]).unwrap();
1726        assert_eq!(v.len(), 1);
1727        assert_eq!(v.data_type(), &DataType::Struct(fields));
1728    }
1729
1730    #[test]
1731    fn test_interleave_fallback_dictionary_with_nulls() {
1732        let input_1_keys = Int32Array::from_iter([Some(0), None, Some(1)]);
1733        let input_1_values = StringArray::from_iter_values(["foo", "bar"]);
1734        let dict_a = DictionaryArray::new(input_1_keys, Arc::new(input_1_values));
1735
1736        let input_2_keys = Int32Array::from_iter([Some(0), Some(1), None]);
1737        let input_2_values = StringArray::from_iter_values(["baz", "qux"]);
1738        let dict_b = DictionaryArray::new(input_2_keys, Arc::new(input_2_values));
1739
1740        let indices = vec![
1741            (0, 0), // "foo"
1742            (0, 1), // null
1743            (1, 0), // "baz"
1744            (1, 2), // null
1745            (0, 2), // "bar"
1746            (1, 1), // "qux"
1747        ];
1748
1749        let result =
1750            interleave_fallback_dictionary::<Int32Type>(&[&dict_a, &dict_b], &indices).unwrap();
1751        let dict_result = result.as_dictionary::<Int32Type>();
1752
1753        let string_result = dict_result.downcast_dict::<StringArray>().unwrap();
1754        let collected: Vec<_> = string_result.into_iter().collect();
1755        assert_eq!(
1756            collected,
1757            vec![
1758                Some("foo"),
1759                None,
1760                Some("baz"),
1761                None,
1762                Some("bar"),
1763                Some("qux")
1764            ]
1765        );
1766    }
1767
1768    #[test]
1769    fn test_interleave_bytes_offset_overflow() {
1770        let indices: Vec<(usize, usize)> = vec![(0, 0); (i32::MAX >> 4) as usize];
1771        let text = ('a'..='z').collect::<String>();
1772        let values = StringArray::from(vec![Some(text)]);
1773        assert!(matches!(
1774            interleave(&[&values], &indices),
1775            Err(ArrowError::OffsetOverflowError(_))
1776        ));
1777    }
1778
1779    #[test]
1780    fn test_interleave_list_offset_overflow() {
1781        // Build a ListArray<i32> with a single row containing many elements
1782        let mut builder = GenericListBuilder::<i32, _>::new(Int32Builder::new());
1783        for i in 0..32 {
1784            builder.values().append_value(i);
1785        }
1786        builder.append(true);
1787        let list = builder.finish();
1788
1789        // Interleave enough copies to overflow i32 offsets
1790        let indices: Vec<(usize, usize)> = vec![(0, 0); (i32::MAX as usize / 32) + 1];
1791        assert!(matches!(
1792            interleave(&[&list], &indices),
1793            Err(ArrowError::OffsetOverflowError(_))
1794        ));
1795    }
1796
1797    #[test]
1798    fn test_interleave_list_view() {
1799        // `interleave` for ListView falls through to `interleave_fallback`, which uses
1800        // `MutableArrayData`. `list_view::build_extend` copies offsets/sizes but never
1801        // extends the child array, so the result contains offsets/sizes that reference
1802        // positions in the now-absent original child arrays while the child is empty.
1803        //
1804        // lv_a: [[1, 2], [3]]   (values=[1,2,3], offsets=[0,2], sizes=[2,1])
1805        // lv_b: [[4, 5, 6]]     (values=[4,5,6], offsets=[0],   sizes=[3])
1806        // interleave at [(0,0), (1,0), (0,1)] should produce [[1, 2], [4, 5, 6], [3]]
1807        let field = Arc::new(Field::new_list_field(DataType::Int64, false));
1808
1809        let lv_a = ListViewArray::new(
1810            Arc::clone(&field),
1811            ScalarBuffer::from(vec![0i32, 2]),
1812            ScalarBuffer::from(vec![2i32, 1]),
1813            Arc::new(Int64Array::from(vec![1_i64, 2, 3])),
1814            None,
1815        );
1816        let lv_b = ListViewArray::new(
1817            field,
1818            ScalarBuffer::from(vec![0i32]),
1819            ScalarBuffer::from(vec![3i32]),
1820            Arc::new(Int64Array::from(vec![4_i64, 5, 6])),
1821            None,
1822        );
1823
1824        let result = interleave(
1825            &[&lv_a as &dyn Array, &lv_b as &dyn Array],
1826            &[(0, 0), (1, 0), (0, 1)],
1827        )
1828        .unwrap();
1829
1830        result
1831            .to_data()
1832            .validate_full()
1833            .expect("interleaved ListViewArray must be internally consistent");
1834
1835        let result_lv = result.as_list_view::<i32>();
1836        assert_eq!(result_lv.len(), 3);
1837        assert_eq!(
1838            result_lv.value(0).as_primitive::<Int64Type>().values(),
1839            &[1, 2]
1840        );
1841        assert_eq!(
1842            result_lv.value(1).as_primitive::<Int64Type>().values(),
1843            &[4, 5, 6]
1844        );
1845        assert_eq!(
1846            result_lv.value(2).as_primitive::<Int64Type>().values(),
1847            &[3]
1848        );
1849    }
1850}