arrow-array 54.3.1

Array abstractions for Apache Arrow
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Idiomatic iterator for [`RunArray`](crate::RunArray)

use crate::{array::ArrayAccessor, types::RunEndIndexType, Array, TypedRunArray};
use arrow_buffer::ArrowNativeType;

/// The [`RunArrayIter`] provides an idiomatic way to iterate over the run array.
/// It returns Some(T) if there is a value or None if the value is null.
///
/// The iterator comes with a cost as it has to iterate over three arrays to determine
/// the value to be returned. The run_ends array is used to determine the index of the value.
/// The nulls array is used to determine if the value is null and the values array is used to
/// get the value.
///
/// Unlike other iterators in this crate, [`RunArrayIter`] does not use [`ArrayAccessor`]
/// because the run array accessor does binary search to access each value which is too slow.
/// The run array iterator can determine the next value in constant time.
///
#[derive(Debug)]
pub struct RunArrayIter<'a, R, V>
where
    R: RunEndIndexType,
    V: Sync + Send,
    &'a V: ArrayAccessor,
    <&'a V as ArrayAccessor>::Item: Default,
{
    array: TypedRunArray<'a, R, V>,
    current_front_logical: usize,
    current_front_physical: usize,
    current_back_logical: usize,
    current_back_physical: usize,
}

impl<'a, R, V> RunArrayIter<'a, R, V>
where
    R: RunEndIndexType,
    V: Sync + Send,
    &'a V: ArrayAccessor,
    <&'a V as ArrayAccessor>::Item: Default,
{
    /// create a new iterator
    pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
        let current_front_physical = array.run_array().get_start_physical_index();
        let current_back_physical = array.run_array().get_end_physical_index() + 1;
        RunArrayIter {
            array,
            current_front_logical: array.offset(),
            current_front_physical,
            current_back_logical: array.offset() + array.len(),
            current_back_physical,
        }
    }
}

impl<'a, R, V> Iterator for RunArrayIter<'a, R, V>
where
    R: RunEndIndexType,
    V: Sync + Send,
    &'a V: ArrayAccessor,
    <&'a V as ArrayAccessor>::Item: Default,
{
    type Item = Option<<&'a V as ArrayAccessor>::Item>;

    #[inline]
    fn next(&mut self) -> Option<Self::Item> {
        if self.current_front_logical == self.current_back_logical {
            return None;
        }

        // If current logical index is greater than current run end index then increment
        // the physical index.
        let run_ends = self.array.run_ends().values();
        if self.current_front_logical >= run_ends[self.current_front_physical].as_usize() {
            // As the run_ends is expected to be strictly increasing, there
            // should be at least one logical entry in one physical entry. Because of this
            // reason the next value can be accessed by incrementing physical index once.
            self.current_front_physical += 1;
        }
        if self.array.values().is_null(self.current_front_physical) {
            self.current_front_logical += 1;
            Some(None)
        } else {
            self.current_front_logical += 1;
            // Safety:
            // The self.current_physical is kept within bounds of self.current_logical.
            // The self.current_logical will not go out of bounds because of the check
            // `self.current_logical = self.current_end_logical` above.
            unsafe {
                Some(Some(
                    self.array
                        .values()
                        .value_unchecked(self.current_front_physical),
                ))
            }
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (
            self.current_back_logical - self.current_front_logical,
            Some(self.current_back_logical - self.current_front_logical),
        )
    }
}

impl<'a, R, V> DoubleEndedIterator for RunArrayIter<'a, R, V>
where
    R: RunEndIndexType,
    V: Sync + Send,
    &'a V: ArrayAccessor,
    <&'a V as ArrayAccessor>::Item: Default,
{
    fn next_back(&mut self) -> Option<Self::Item> {
        if self.current_back_logical == self.current_front_logical {
            return None;
        }

        self.current_back_logical -= 1;

        let run_ends = self.array.run_ends().values();
        if self.current_back_physical > 0
            && self.current_back_logical < run_ends[self.current_back_physical - 1].as_usize()
        {
            // As the run_ends is expected to be strictly increasing, there
            // should be at least one logical entry in one physical entry. Because of this
            // reason the next value can be accessed by decrementing physical index once.
            self.current_back_physical -= 1;
        }
        Some(if self.array.values().is_null(self.current_back_physical) {
            None
        } else {
            // Safety:
            // The check `self.current_end_physical > 0` ensures the value will not underflow.
            // Also self.current_end_physical starts with array.len() and
            // decrements based on the bounds of self.current_end_logical.
            unsafe {
                Some(
                    self.array
                        .values()
                        .value_unchecked(self.current_back_physical),
                )
            }
        })
    }
}

/// all arrays have known size.
impl<'a, R, V> ExactSizeIterator for RunArrayIter<'a, R, V>
where
    R: RunEndIndexType,
    V: Sync + Send,
    &'a V: ArrayAccessor,
    <&'a V as ArrayAccessor>::Item: Default,
{
}

#[cfg(test)]
mod tests {
    use rand::{seq::SliceRandom, thread_rng, Rng};

    use crate::{
        array::{Int32Array, StringArray},
        builder::PrimitiveRunBuilder,
        types::{Int16Type, Int32Type},
        Array, Int64RunArray, PrimitiveArray, RunArray,
    };

    fn build_input_array(size: usize) -> Vec<Option<i32>> {
        // The input array is created by shuffling and repeating
        // the seed values random number of times.
        let mut seed: Vec<Option<i32>> = vec![
            None,
            None,
            None,
            Some(1),
            Some(2),
            Some(3),
            Some(4),
            Some(5),
            Some(6),
            Some(7),
            Some(8),
            Some(9),
        ];
        let mut result: Vec<Option<i32>> = Vec::with_capacity(size);
        let mut ix = 0;
        let mut rng = thread_rng();
        // run length can go up to 8. Cap the max run length for smaller arrays to size / 2.
        let max_run_length = 8_usize.min(1_usize.max(size / 2));
        while result.len() < size {
            // shuffle the seed array if all the values are iterated.
            if ix == 0 {
                seed.shuffle(&mut rng);
            }
            // repeat the items between 1 and 8 times. Cap the length for smaller sized arrays
            let num = max_run_length.min(rand::thread_rng().gen_range(1..=max_run_length));
            for _ in 0..num {
                result.push(seed[ix]);
            }
            ix += 1;
            if ix == seed.len() {
                ix = 0
            }
        }
        result.resize(size, None);
        result
    }

    #[test]
    fn test_primitive_array_iter_round_trip() {
        let mut input_vec = vec![
            Some(32),
            Some(32),
            None,
            Some(64),
            Some(64),
            Some(64),
            Some(72),
        ];
        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
        builder.extend(input_vec.iter().copied());
        let ree_array = builder.finish();
        let ree_array = ree_array.downcast::<Int32Array>().unwrap();

        let output_vec: Vec<Option<i32>> = ree_array.into_iter().collect();
        assert_eq!(input_vec, output_vec);

        let rev_output_vec: Vec<Option<i32>> = ree_array.into_iter().rev().collect();
        input_vec.reverse();
        assert_eq!(input_vec, rev_output_vec);
    }

    #[test]
    fn test_double_ended() {
        let input_vec = vec![
            Some(32),
            Some(32),
            None,
            Some(64),
            Some(64),
            Some(64),
            Some(72),
        ];
        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
        builder.extend(input_vec);
        let ree_array = builder.finish();
        let ree_array = ree_array.downcast::<Int32Array>().unwrap();

        let mut iter = ree_array.into_iter();
        assert_eq!(Some(Some(32)), iter.next());
        assert_eq!(Some(Some(72)), iter.next_back());
        assert_eq!(Some(Some(32)), iter.next());
        assert_eq!(Some(Some(64)), iter.next_back());
        assert_eq!(Some(None), iter.next());
        assert_eq!(Some(Some(64)), iter.next_back());
        assert_eq!(Some(Some(64)), iter.next());
        assert_eq!(None, iter.next_back());
        assert_eq!(None, iter.next());
    }

    #[test]
    fn test_run_iterator_comprehensive() {
        // Test forward and backward iterator for different array lengths.
        let logical_lengths = vec![1_usize, 2, 3, 4, 15, 16, 17, 63, 64, 65];

        for logical_len in logical_lengths {
            let input_array = build_input_array(logical_len);

            let mut run_array_builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
            run_array_builder.extend(input_array.iter().copied());
            let run_array = run_array_builder.finish();
            let typed_array = run_array.downcast::<Int32Array>().unwrap();

            // test forward iterator
            let mut input_iter = input_array.iter().copied();
            let mut run_array_iter = typed_array.into_iter();
            for _ in 0..logical_len {
                assert_eq!(input_iter.next(), run_array_iter.next());
            }
            assert_eq!(None, run_array_iter.next());

            // test reverse iterator
            let mut input_iter = input_array.iter().rev().copied();
            let mut run_array_iter = typed_array.into_iter().rev();
            for _ in 0..logical_len {
                assert_eq!(input_iter.next(), run_array_iter.next());
            }
            assert_eq!(None, run_array_iter.next());
        }
    }

    #[test]
    fn test_string_array_iter_round_trip() {
        let input_vec = vec!["ab", "ab", "ba", "cc", "cc"];
        let input_ree_array: Int64RunArray = input_vec.into_iter().collect();
        let string_ree_array = input_ree_array.downcast::<StringArray>().unwrap();

        // to and from iter, with a +1
        let result: Vec<Option<String>> = string_ree_array
            .into_iter()
            .map(|e| {
                e.map(|e| {
                    let mut a = e.to_string();
                    a.push('b');
                    a
                })
            })
            .collect();

        let result_asref: Vec<Option<&str>> = result.iter().map(|f| f.as_deref()).collect();

        let expected_vec = vec![
            Some("abb"),
            Some("abb"),
            Some("bab"),
            Some("ccb"),
            Some("ccb"),
        ];

        assert_eq!(expected_vec, result_asref);
    }

    #[test]
    #[cfg_attr(miri, ignore)] // Takes too long
    fn test_sliced_run_array_iterator() {
        let total_len = 80;
        let input_array = build_input_array(total_len);

        // Encode the input_array to run array
        let mut builder =
            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
        builder.extend(input_array.iter().copied());
        let run_array = builder.finish();

        // test for all slice lengths.
        for slice_len in 1..=total_len {
            // test for offset = 0, slice length = slice_len
            let sliced_run_array: RunArray<Int16Type> =
                run_array.slice(0, slice_len).into_data().into();
            let sliced_typed_run_array = sliced_run_array
                .downcast::<PrimitiveArray<Int32Type>>()
                .unwrap();

            // Iterate on sliced typed run array
            let actual: Vec<Option<i32>> = sliced_typed_run_array.into_iter().collect();
            let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
            assert_eq!(expected, actual);

            // test for offset = total_len - slice_len, length = slice_len
            let sliced_run_array: RunArray<Int16Type> = run_array
                .slice(total_len - slice_len, slice_len)
                .into_data()
                .into();
            let sliced_typed_run_array = sliced_run_array
                .downcast::<PrimitiveArray<Int32Type>>()
                .unwrap();

            // Iterate on sliced typed run array
            let actual: Vec<Option<i32>> = sliced_typed_run_array.into_iter().collect();
            let expected: Vec<Option<i32>> = input_array
                .iter()
                .skip(total_len - slice_len)
                .copied()
                .collect();
            assert_eq!(expected, actual);
        }
    }
}