use arrow_buffer::ArrowNativeType;
use crate::{array::ArrayAccessor, types::RunEndIndexType, Array, TypedRunArray};
#[derive(Debug)]
pub struct RunArrayIter<'a, R, V>
where
R: RunEndIndexType,
V: Sync + Send,
&'a V: ArrayAccessor,
<&'a V as ArrayAccessor>::Item: Default,
{
array: TypedRunArray<'a, R, V>,
current_logical: usize,
current_physical: usize,
current_end_logical: usize,
current_end_physical: usize,
}
impl<'a, R, V> RunArrayIter<'a, R, V>
where
R: RunEndIndexType,
V: Sync + Send,
&'a V: ArrayAccessor,
<&'a V as ArrayAccessor>::Item: Default,
{
pub fn new(array: TypedRunArray<'a, R, V>) -> Self {
let logical_len = array.len();
let physical_len: usize = array.values().len();
RunArrayIter {
array,
current_logical: 0,
current_physical: 0,
current_end_logical: logical_len,
current_end_physical: physical_len,
}
}
}
impl<'a, R, V> Iterator for RunArrayIter<'a, R, V>
where
R: RunEndIndexType,
V: Sync + Send,
&'a V: ArrayAccessor,
<&'a V as ArrayAccessor>::Item: Default,
{
type Item = Option<<&'a V as ArrayAccessor>::Item>;
#[inline]
fn next(&mut self) -> Option<Self::Item> {
if self.current_logical == self.current_end_logical {
return None;
}
if self.current_logical
>= self
.array
.run_ends()
.value(self.current_physical)
.as_usize()
{
self.current_physical += 1;
}
if self.array.values().is_null(self.current_physical) {
self.current_logical += 1;
Some(None)
} else {
self.current_logical += 1;
unsafe {
Some(Some(
self.array.values().value_unchecked(self.current_physical),
))
}
}
}
fn size_hint(&self) -> (usize, Option<usize>) {
(
self.current_end_logical - self.current_logical,
Some(self.current_end_logical - self.current_logical),
)
}
}
impl<'a, R, V> DoubleEndedIterator for RunArrayIter<'a, R, V>
where
R: RunEndIndexType,
V: Sync + Send,
&'a V: ArrayAccessor,
<&'a V as ArrayAccessor>::Item: Default,
{
fn next_back(&mut self) -> Option<Self::Item> {
if self.current_end_logical == self.current_logical {
return None;
}
self.current_end_logical -= 1;
if self.current_end_physical > 0
&& self.current_end_logical
< self
.array
.run_ends()
.value(self.current_end_physical - 1)
.as_usize()
{
self.current_end_physical -= 1;
}
Some(if self.array.values().is_null(self.current_end_physical) {
None
} else {
unsafe {
Some(
self.array
.values()
.value_unchecked(self.current_end_physical),
)
}
})
}
}
impl<'a, R, V> ExactSizeIterator for RunArrayIter<'a, R, V>
where
R: RunEndIndexType,
V: Sync + Send,
&'a V: ArrayAccessor,
<&'a V as ArrayAccessor>::Item: Default,
{
}
#[cfg(test)]
mod tests {
use rand::{seq::SliceRandom, thread_rng, Rng};
use crate::{
array::{Int32Array, StringArray},
builder::PrimitiveRunBuilder,
types::Int32Type,
Int64RunArray,
};
fn build_input_array(size: usize) -> Vec<Option<i32>> {
let mut seed: Vec<Option<i32>> = vec![
None,
None,
None,
Some(1),
Some(2),
Some(3),
Some(4),
Some(5),
Some(6),
Some(7),
Some(8),
Some(9),
];
let mut result: Vec<Option<i32>> = Vec::with_capacity(size);
let mut ix = 0;
let mut rng = thread_rng();
let max_run_length = 8_usize.min(1_usize.max(size / 2));
while result.len() < size {
if ix == 0 {
seed.shuffle(&mut rng);
}
let num =
max_run_length.min(rand::thread_rng().gen_range(1..=max_run_length));
for _ in 0..num {
result.push(seed[ix]);
}
ix += 1;
if ix == seed.len() {
ix = 0
}
}
result.resize(size, None);
result
}
#[test]
fn test_primitive_array_iter_round_trip() {
let mut input_vec = vec![
Some(32),
Some(32),
None,
Some(64),
Some(64),
Some(64),
Some(72),
];
let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
builder.extend(input_vec.clone().into_iter());
let ree_array = builder.finish();
let ree_array = ree_array.downcast::<Int32Array>().unwrap();
let output_vec: Vec<Option<i32>> = ree_array.into_iter().collect();
assert_eq!(input_vec, output_vec);
let rev_output_vec: Vec<Option<i32>> = ree_array.into_iter().rev().collect();
input_vec.reverse();
assert_eq!(input_vec, rev_output_vec);
}
#[test]
fn test_double_ended() {
let input_vec = vec![
Some(32),
Some(32),
None,
Some(64),
Some(64),
Some(64),
Some(72),
];
let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
builder.extend(input_vec.into_iter());
let ree_array = builder.finish();
let ree_array = ree_array.downcast::<Int32Array>().unwrap();
let mut iter = ree_array.into_iter();
assert_eq!(Some(Some(32)), iter.next());
assert_eq!(Some(Some(72)), iter.next_back());
assert_eq!(Some(Some(32)), iter.next());
assert_eq!(Some(Some(64)), iter.next_back());
assert_eq!(Some(None), iter.next());
assert_eq!(Some(Some(64)), iter.next_back());
assert_eq!(Some(Some(64)), iter.next());
assert_eq!(None, iter.next_back());
assert_eq!(None, iter.next());
}
#[test]
fn test_run_iterator_comprehensive() {
let logical_lengths = vec![1_usize, 2, 3, 4, 15, 16, 17, 63, 64, 65];
for logical_len in logical_lengths {
let input_array = build_input_array(logical_len);
let mut run_array_builder =
PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
run_array_builder.extend(input_array.iter().copied());
let run_array = run_array_builder.finish();
let typed_array = run_array.downcast::<Int32Array>().unwrap();
let mut input_iter = input_array.iter().copied();
let mut run_array_iter = typed_array.into_iter();
for _ in 0..logical_len {
assert_eq!(input_iter.next(), run_array_iter.next());
}
assert_eq!(None, run_array_iter.next());
let mut input_iter = input_array.iter().rev().copied();
let mut run_array_iter = typed_array.into_iter().rev();
for _ in 0..logical_len {
assert_eq!(input_iter.next(), run_array_iter.next());
}
assert_eq!(None, run_array_iter.next());
}
}
#[test]
fn test_string_array_iter_round_trip() {
let input_vec = vec!["ab", "ab", "ba", "cc", "cc"];
let input_ree_array: Int64RunArray = input_vec.into_iter().collect();
let string_ree_array = input_ree_array.downcast::<StringArray>().unwrap();
let result: Vec<Option<String>> = string_ree_array
.into_iter()
.map(|e| {
e.map(|e| {
let mut a = e.to_string();
a.push('b');
a
})
})
.collect();
let result_asref: Vec<Option<&str>> =
result.iter().map(|f| f.as_deref()).collect();
let expected_vec = vec![
Some("abb"),
Some("abb"),
Some("bab"),
Some("ccb"),
Some("ccb"),
];
assert_eq!(expected_vec, result_asref);
}
}