use crate::{array::ArrayAccessor, types::RunEndIndexType, Array, TypedRunArray};
use arrow_buffer::ArrowNativeType;
#[derive(Debug)]
pub struct RunArrayIter<'a, R, V>
where
R: RunEndIndexType,
V: Sync + Send,
&'a V: ArrayAccessor,
<&'a V as ArrayAccessor>::Item: Default,
{
array: TypedRunArray<'a, R, V>,
current_front_logical: usize,
current_front_physical: usize,
current_back_logical: usize,
current_back_physical: usize,
}
impl<'a, R, V> RunArrayIter<'a, R, V>
where
R: RunEndIndexType,
V: Sync + Send,
&'a V: ArrayAccessor,
<&'a V as ArrayAccessor>::Item: Default,
{
pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
let current_front_physical = array.run_array().get_start_physical_index();
let current_back_physical = array.run_array().get_end_physical_index() + 1;
RunArrayIter {
array,
current_front_logical: array.offset(),
current_front_physical,
current_back_logical: array.offset() + array.len(),
current_back_physical,
}
}
}
impl<'a, R, V> Iterator for RunArrayIter<'a, R, V>
where
R: RunEndIndexType,
V: Sync + Send,
&'a V: ArrayAccessor,
<&'a V as ArrayAccessor>::Item: Default,
{
type Item = Option<<&'a V as ArrayAccessor>::Item>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.current_front_logical == self.current_back_logical {
return None;
}
let run_ends = self.array.run_ends().values();
if self.current_front_logical >= run_ends[self.current_front_physical].as_usize() {
self.current_front_physical += 1;
}
if self.array.values().is_null(self.current_front_physical) {
self.current_front_logical += 1;
Some(None)
} else {
self.current_front_logical += 1;
unsafe {
Some(Some(
self.array
.values()
.value_unchecked(self.current_front_physical),
))
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(
self.current_back_logical - self.current_front_logical,
Some(self.current_back_logical - self.current_front_logical),
)
}
}
impl<'a, R, V> DoubleEndedIterator for RunArrayIter<'a, R, V>
where
R: RunEndIndexType,
V: Sync + Send,
&'a V: ArrayAccessor,
<&'a V as ArrayAccessor>::Item: Default,
{
fn next_back(&mut self) -> Option<Self::Item> {
if self.current_back_logical == self.current_front_logical {
return None;
}
self.current_back_logical -= 1;
let run_ends = self.array.run_ends().values();
if self.current_back_physical > 0
&& self.current_back_logical < run_ends[self.current_back_physical - 1].as_usize()
{
self.current_back_physical -= 1;
}
Some(if self.array.values().is_null(self.current_back_physical) {
None
} else {
unsafe {
Some(
self.array
.values()
.value_unchecked(self.current_back_physical),
)
}
})
}
}
impl<'a, R, V> ExactSizeIterator for RunArrayIter<'a, R, V>
where
R: RunEndIndexType,
V: Sync + Send,
&'a V: ArrayAccessor,
<&'a V as ArrayAccessor>::Item: Default,
{
}
#[cfg(test)]
mod tests {
use rand::{seq::SliceRandom, thread_rng, Rng};
use crate::{
array::{Int32Array, StringArray},
builder::PrimitiveRunBuilder,
types::{Int16Type, Int32Type},
Array, Int64RunArray, PrimitiveArray, RunArray,
};
fn build_input_array(size: usize) -> Vec<Option<i32>> {
let mut seed: Vec<Option<i32>> = vec![
None,
None,
None,
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
Some(7),
Some(8),
Some(9),
];
let mut result: Vec<Option<i32>> = Vec::with_capacity(size);
let mut ix = 0;
let mut rng = thread_rng();
let max_run_length = 8_usize.min(1_usize.max(size / 2));
while result.len() < size {
if ix == 0 {
seed.shuffle(&mut rng);
}
let num = max_run_length.min(rand::thread_rng().gen_range(1..=max_run_length));
for _ in 0..num {
result.push(seed[ix]);
}
ix += 1;
if ix == seed.len() {
ix = 0
}
}
result.resize(size, None);
result
}
#[test]
fn test_primitive_array_iter_round_trip() {
let mut input_vec = vec![
Some(32),
Some(32),
None,
Some(64),
Some(64),
Some(64),
Some(72),
];
let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
builder.extend(input_vec.iter().copied());
let ree_array = builder.finish();
let ree_array = ree_array.downcast::<Int32Array>().unwrap();
let output_vec: Vec<Option<i32>> = ree_array.into_iter().collect();
assert_eq!(input_vec, output_vec);
let rev_output_vec: Vec<Option<i32>> = ree_array.into_iter().rev().collect();
input_vec.reverse();
assert_eq!(input_vec, rev_output_vec);
}
#[test]
fn test_double_ended() {
let input_vec = vec![
Some(32),
Some(32),
None,
Some(64),
Some(64),
Some(64),
Some(72),
];
let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
builder.extend(input_vec);
let ree_array = builder.finish();
let ree_array = ree_array.downcast::<Int32Array>().unwrap();
let mut iter = ree_array.into_iter();
assert_eq!(Some(Some(32)), iter.next());
assert_eq!(Some(Some(72)), iter.next_back());
assert_eq!(Some(Some(32)), iter.next());
assert_eq!(Some(Some(64)), iter.next_back());
assert_eq!(Some(None), iter.next());
assert_eq!(Some(Some(64)), iter.next_back());
assert_eq!(Some(Some(64)), iter.next());
assert_eq!(None, iter.next_back());
assert_eq!(None, iter.next());
}
#[test]
fn test_run_iterator_comprehensive() {
let logical_lengths = vec![1_usize, 2, 3, 4, 15, 16, 17, 63, 64, 65];
for logical_len in logical_lengths {
let input_array = build_input_array(logical_len);
let mut run_array_builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
run_array_builder.extend(input_array.iter().copied());
let run_array = run_array_builder.finish();
let typed_array = run_array.downcast::<Int32Array>().unwrap();
let mut input_iter = input_array.iter().copied();
let mut run_array_iter = typed_array.into_iter();
for _ in 0..logical_len {
assert_eq!(input_iter.next(), run_array_iter.next());
}
assert_eq!(None, run_array_iter.next());
let mut input_iter = input_array.iter().rev().copied();
let mut run_array_iter = typed_array.into_iter().rev();
for _ in 0..logical_len {
assert_eq!(input_iter.next(), run_array_iter.next());
}
assert_eq!(None, run_array_iter.next());
}
}
#[test]
fn test_string_array_iter_round_trip() {
let input_vec = vec!["ab", "ab", "ba", "cc", "cc"];
let input_ree_array: Int64RunArray = input_vec.into_iter().collect();
let string_ree_array = input_ree_array.downcast::<StringArray>().unwrap();
let result: Vec<Option<String>> = string_ree_array
.into_iter()
.map(|e| {
e.map(|e| {
let mut a = e.to_string();
a.push('b');
a
})
})
.collect();
let result_asref: Vec<Option<&str>> = result.iter().map(|f| f.as_deref()).collect();
let expected_vec = vec![
Some("abb"),
Some("abb"),
Some("bab"),
Some("ccb"),
Some("ccb"),
];
assert_eq!(expected_vec, result_asref);
}
#[test]
#[cfg_attr(miri, ignore)] fn test_sliced_run_array_iterator() {
let total_len = 80;
let input_array = build_input_array(total_len);
let mut builder =
PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
builder.extend(input_array.iter().copied());
let run_array = builder.finish();
for slice_len in 1..=total_len {
let sliced_run_array: RunArray<Int16Type> =
run_array.slice(0, slice_len).into_data().into();
let sliced_typed_run_array = sliced_run_array
.downcast::<PrimitiveArray<Int32Type>>()
.unwrap();
let actual: Vec<Option<i32>> = sliced_typed_run_array.into_iter().collect();
let expected: Vec<Option<i32>> = input_array.iter().take(slice_len).copied().collect();
assert_eq!(expected, actual);
let sliced_run_array: RunArray<Int16Type> = run_array
.slice(total_len - slice_len, slice_len)
.into_data()
.into();
let sliced_typed_run_array = sliced_run_array
.downcast::<PrimitiveArray<Int32Type>>()
.unwrap();
let actual: Vec<Option<i32>> = sliced_typed_run_array.into_iter().collect();
let expected: Vec<Option<i32>> = input_array
.iter()
.skip(total_len - slice_len)
.copied()
.collect();
assert_eq!(expected, actual);
}
}
}