arrow-array 33.0.0

Array abstractions for Apache Arrow
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Idiomatic iterator for [`RunArray`](crate::Array)

use arrow_buffer::ArrowNativeType;

use crate::{array::ArrayAccessor, types::RunEndIndexType, Array, TypedRunArray};

/// The [`RunArrayIter`] provides an idiomatic way to iterate over the run array.
/// It returns Some(T) if there is a value or None if the value is null.
///
/// The iterator comes with a cost as it has to iterate over three arrays to determine
/// the value to be returned. The run_ends array is used to determine the index of the value.
/// The nulls array is used to determine if the value is null and the values array is used to
/// get the value.
///
/// Unlike other iterators in this crate, [`RunArrayIter`] does not use [`ArrayAccessor`]
/// because the run array accessor does binary search to access each value which is too slow.
/// The run array iterator can determine the next value in constant time.
///
#[derive(Debug)]
pub struct RunArrayIter<'a, R, V>
where
    R: RunEndIndexType,
    V: Sync + Send,
    &'a V: ArrayAccessor,
    <&'a V as ArrayAccessor>::Item: Default,
{
    array: TypedRunArray<'a, R, V>,
    current_logical: usize,
    current_physical: usize,
    current_end_logical: usize,
    current_end_physical: usize,
}

impl<'a, R, V> RunArrayIter<'a, R, V>
where
    R: RunEndIndexType,
    V: Sync + Send,
    &'a V: ArrayAccessor,
    <&'a V as ArrayAccessor>::Item: Default,
{
    /// create a new iterator
    pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
        let logical_len = array.len();
        let physical_len: usize = array.values().len();
        RunArrayIter {
            array,
            current_logical: 0,
            current_physical: 0,
            current_end_logical: logical_len,
            current_end_physical: physical_len,
        }
    }
}

impl<'a, R, V> Iterator for RunArrayIter<'a, R, V>
where
    R: RunEndIndexType,
    V: Sync + Send,
    &'a V: ArrayAccessor,
    <&'a V as ArrayAccessor>::Item: Default,
{
    type Item = Option<<&'a V as ArrayAccessor>::Item>;

    #[inline]
    fn next(&mut self) -> Option<Self::Item> {
        if self.current_logical == self.current_end_logical {
            return None;
        }
        // If current logical index is greater than current run end index then increment
        // the physical index.
        if self.current_logical
            >= self
                .array
                .run_ends()
                .value(self.current_physical)
                .as_usize()
        {
            // As the run_ends is expected to be strictly increasing, there
            // should be at least one logical entry in one physical entry. Because of this
            // reason the next value can be accessed by incrementing physical index once.
            self.current_physical += 1;
        }
        if self.array.values().is_null(self.current_physical) {
            self.current_logical += 1;
            Some(None)
        } else {
            self.current_logical += 1;
            // Safety:
            // The self.current_physical is kept within bounds of self.current_logical.
            // The self.current_logical will not go out of bounds because of the check
            // `self.current_logical = self.current_end_logical` above.
            unsafe {
                Some(Some(
                    self.array.values().value_unchecked(self.current_physical),
                ))
            }
        }
    }

    fn size_hint(&self) -> (usize, Option<usize>) {
        (
            self.current_end_logical - self.current_logical,
            Some(self.current_end_logical - self.current_logical),
        )
    }
}

impl<'a, R, V> DoubleEndedIterator for RunArrayIter<'a, R, V>
where
    R: RunEndIndexType,
    V: Sync + Send,
    &'a V: ArrayAccessor,
    <&'a V as ArrayAccessor>::Item: Default,
{
    fn next_back(&mut self) -> Option<Self::Item> {
        if self.current_end_logical == self.current_logical {
            return None;
        }

        self.current_end_logical -= 1;

        if self.current_end_physical > 0
            && self.current_end_logical
                < self
                    .array
                    .run_ends()
                    .value(self.current_end_physical - 1)
                    .as_usize()
        {
            // As the run_ends is expected to be strictly increasing, there
            // should be at least one logical entry in one physical entry. Because of this
            // reason the next value can be accessed by decrementing physical index once.
            self.current_end_physical -= 1;
        }
        Some(if self.array.values().is_null(self.current_end_physical) {
            None
        } else {
            // Safety:
            // The check `self.current_end_physical > 0` ensures the value will not underflow.
            // Also self.current_end_physical starts with array.len() and
            // decrements based on the bounds of self.current_end_logical.
            unsafe {
                Some(
                    self.array
                        .values()
                        .value_unchecked(self.current_end_physical),
                )
            }
        })
    }
}

/// all arrays have known size.
impl<'a, R, V> ExactSizeIterator for RunArrayIter<'a, R, V>
where
    R: RunEndIndexType,
    V: Sync + Send,
    &'a V: ArrayAccessor,
    <&'a V as ArrayAccessor>::Item: Default,
{
}

#[cfg(test)]
mod tests {
    use rand::{seq::SliceRandom, thread_rng, Rng};

    use crate::{
        array::{Int32Array, StringArray},
        builder::PrimitiveRunBuilder,
        types::Int32Type,
        Int64RunArray,
    };

    fn build_input_array(size: usize) -> Vec<Option<i32>> {
        // The input array is created by shuffling and repeating
        // the seed values random number of times.
        let mut seed: Vec<Option<i32>> = vec![
            None,
            None,
            None,
            Some(1),
            Some(2),
            Some(3),
            Some(4),
            Some(5),
            Some(6),
            Some(7),
            Some(8),
            Some(9),
        ];
        let mut result: Vec<Option<i32>> = Vec::with_capacity(size);
        let mut ix = 0;
        let mut rng = thread_rng();
        // run length can go up to 8. Cap the max run length for smaller arrays to size / 2.
        let max_run_length = 8_usize.min(1_usize.max(size / 2));
        while result.len() < size {
            // shuffle the seed array if all the values are iterated.
            if ix == 0 {
                seed.shuffle(&mut rng);
            }
            // repeat the items between 1 and 8 times. Cap the length for smaller sized arrays
            let num =
                max_run_length.min(rand::thread_rng().gen_range(1..=max_run_length));
            for _ in 0..num {
                result.push(seed[ix]);
            }
            ix += 1;
            if ix == seed.len() {
                ix = 0
            }
        }
        result.resize(size, None);
        result
    }

    #[test]
    fn test_primitive_array_iter_round_trip() {
        let mut input_vec = vec![
            Some(32),
            Some(32),
            None,
            Some(64),
            Some(64),
            Some(64),
            Some(72),
        ];
        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
        builder.extend(input_vec.clone().into_iter());
        let ree_array = builder.finish();
        let ree_array = ree_array.downcast::<Int32Array>().unwrap();

        let output_vec: Vec<Option<i32>> = ree_array.into_iter().collect();
        assert_eq!(input_vec, output_vec);

        let rev_output_vec: Vec<Option<i32>> = ree_array.into_iter().rev().collect();
        input_vec.reverse();
        assert_eq!(input_vec, rev_output_vec);
    }

    #[test]
    fn test_double_ended() {
        let input_vec = vec![
            Some(32),
            Some(32),
            None,
            Some(64),
            Some(64),
            Some(64),
            Some(72),
        ];
        let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
        builder.extend(input_vec.into_iter());
        let ree_array = builder.finish();
        let ree_array = ree_array.downcast::<Int32Array>().unwrap();

        let mut iter = ree_array.into_iter();
        assert_eq!(Some(Some(32)), iter.next());
        assert_eq!(Some(Some(72)), iter.next_back());
        assert_eq!(Some(Some(32)), iter.next());
        assert_eq!(Some(Some(64)), iter.next_back());
        assert_eq!(Some(None), iter.next());
        assert_eq!(Some(Some(64)), iter.next_back());
        assert_eq!(Some(Some(64)), iter.next());
        assert_eq!(None, iter.next_back());
        assert_eq!(None, iter.next());
    }

    #[test]
    fn test_run_iterator_comprehensive() {
        // Test forward and backward iterator for different array lengths.
        let logical_lengths = vec![1_usize, 2, 3, 4, 15, 16, 17, 63, 64, 65];

        for logical_len in logical_lengths {
            let input_array = build_input_array(logical_len);

            let mut run_array_builder =
                PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
            run_array_builder.extend(input_array.iter().copied());
            let run_array = run_array_builder.finish();
            let typed_array = run_array.downcast::<Int32Array>().unwrap();

            // test forward iterator
            let mut input_iter = input_array.iter().copied();
            let mut run_array_iter = typed_array.into_iter();
            for _ in 0..logical_len {
                assert_eq!(input_iter.next(), run_array_iter.next());
            }
            assert_eq!(None, run_array_iter.next());

            // test reverse iterator
            let mut input_iter = input_array.iter().rev().copied();
            let mut run_array_iter = typed_array.into_iter().rev();
            for _ in 0..logical_len {
                assert_eq!(input_iter.next(), run_array_iter.next());
            }
            assert_eq!(None, run_array_iter.next());
        }
    }

    #[test]
    fn test_string_array_iter_round_trip() {
        let input_vec = vec!["ab", "ab", "ba", "cc", "cc"];
        let input_ree_array: Int64RunArray = input_vec.into_iter().collect();
        let string_ree_array = input_ree_array.downcast::<StringArray>().unwrap();

        // to and from iter, with a +1
        let result: Vec<Option<String>> = string_ree_array
            .into_iter()
            .map(|e| {
                e.map(|e| {
                    let mut a = e.to_string();
                    a.push('b');
                    a
                })
            })
            .collect();

        let result_asref: Vec<Option<&str>> =
            result.iter().map(|f| f.as_deref()).collect();

        let expected_vec = vec![
            Some("abb"),
            Some("abb"),
            Some("bab"),
            Some("ccb"),
            Some("ccb"),
        ];

        assert_eq!(expected_vec, result_asref);
    }
}