use std::any::Any;
use std::sync::Arc;
use arrow_buffer::{ArrowNativeType, BooleanBufferBuilder, NullBuffer, RunEndBuffer};
use arrow_data::{ArrayData, ArrayDataBuilder};
use arrow_schema::{ArrowError, DataType, Field};
use crate::{
    builder::StringRunBuilder,
    make_array,
    run_iterator::RunArrayIter,
    types::{Int16Type, Int32Type, Int64Type, RunEndIndexType},
    Array, ArrayAccessor, ArrayRef, PrimitiveArray,
};
pub struct RunArray<R: RunEndIndexType> {
    data_type: DataType,
    run_ends: RunEndBuffer<R::Native>,
    values: ArrayRef,
}
impl<R: RunEndIndexType> Clone for RunArray<R> {
    fn clone(&self) -> Self {
        Self {
            data_type: self.data_type.clone(),
            run_ends: self.run_ends.clone(),
            values: self.values.clone(),
        }
    }
}
impl<R: RunEndIndexType> RunArray<R> {
    pub fn logical_len(run_ends: &PrimitiveArray<R>) -> usize {
        let len = run_ends.len();
        if len == 0 {
            return 0;
        }
        run_ends.value(len - 1).as_usize()
    }
    pub fn try_new(run_ends: &PrimitiveArray<R>, values: &dyn Array) -> Result<Self, ArrowError> {
        let run_ends_type = run_ends.data_type().clone();
        let values_type = values.data_type().clone();
        let ree_array_type = DataType::RunEndEncoded(
            Arc::new(Field::new("run_ends", run_ends_type, false)),
            Arc::new(Field::new("values", values_type, true)),
        );
        let len = RunArray::logical_len(run_ends);
        let builder = ArrayDataBuilder::new(ree_array_type)
            .len(len)
            .add_child_data(run_ends.to_data())
            .add_child_data(values.to_data());
        let array_data = unsafe { builder.build_unchecked() };
        array_data.validate_data()?;
        Ok(array_data.into())
    }
    pub fn run_ends(&self) -> &RunEndBuffer<R::Native> {
        &self.run_ends
    }
    pub fn values(&self) -> &ArrayRef {
        &self.values
    }
    pub fn get_start_physical_index(&self) -> usize {
        self.run_ends.get_start_physical_index()
    }
    pub fn get_end_physical_index(&self) -> usize {
        self.run_ends.get_end_physical_index()
    }
    pub fn downcast<V: 'static>(&self) -> Option<TypedRunArray<'_, R, V>> {
        let values = self.values.as_any().downcast_ref()?;
        Some(TypedRunArray {
            run_array: self,
            values,
        })
    }
    pub fn get_physical_index(&self, logical_index: usize) -> usize {
        self.run_ends.get_physical_index(logical_index)
    }
    #[inline]
    pub fn get_physical_indices<I>(&self, logical_indices: &[I]) -> Result<Vec<usize>, ArrowError>
    where
        I: ArrowNativeType,
    {
        let len = self.run_ends().len();
        let offset = self.run_ends().offset();
        let indices_len = logical_indices.len();
        if indices_len == 0 {
            return Ok(vec![]);
        }
        let mut ordered_indices: Vec<usize> = (0..indices_len).collect();
        ordered_indices.sort_unstable_by(|lhs, rhs| {
            logical_indices[*lhs]
                .partial_cmp(&logical_indices[*rhs])
                .unwrap()
        });
        let largest_logical_index = logical_indices[*ordered_indices.last().unwrap()].as_usize();
        if largest_logical_index >= len {
            return Err(ArrowError::InvalidArgumentError(format!(
                "Cannot convert all logical indices to physical indices. The logical index cannot be converted is {largest_logical_index}.",
            )));
        }
        let skip_value = self.get_start_physical_index();
        let mut physical_indices = vec![0; indices_len];
        let mut ordered_index = 0_usize;
        for (physical_index, run_end) in self.run_ends.values().iter().enumerate().skip(skip_value)
        {
            let run_end_value = run_end.as_usize() - offset;
            while ordered_index < indices_len
                && logical_indices[ordered_indices[ordered_index]].as_usize() < run_end_value
            {
                physical_indices[ordered_indices[ordered_index]] = physical_index;
                ordered_index += 1;
            }
        }
        if ordered_index < logical_indices.len() {
            let logical_index = logical_indices[ordered_indices[ordered_index]].as_usize();
            return Err(ArrowError::InvalidArgumentError(format!(
                "Cannot convert all logical indices to physical indices. The logical index cannot be converted is {logical_index}.",
            )));
        }
        Ok(physical_indices)
    }
    pub fn slice(&self, offset: usize, length: usize) -> Self {
        Self {
            data_type: self.data_type.clone(),
            run_ends: self.run_ends.slice(offset, length),
            values: self.values.clone(),
        }
    }
}
impl<R: RunEndIndexType> From<ArrayData> for RunArray<R> {
    fn from(data: ArrayData) -> Self {
        match data.data_type() {
            DataType::RunEndEncoded(_, _) => {}
            _ => {
                panic!("Invalid data type for RunArray. The data type should be DataType::RunEndEncoded");
            }
        }
        let child = &data.child_data()[0];
        assert_eq!(child.data_type(), &R::DATA_TYPE, "Incorrect run ends type");
        let run_ends = unsafe {
            let scalar = child.buffers()[0].clone().into();
            RunEndBuffer::new_unchecked(scalar, data.offset(), data.len())
        };
        let values = make_array(data.child_data()[1].clone());
        Self {
            data_type: data.data_type().clone(),
            run_ends,
            values,
        }
    }
}
impl<R: RunEndIndexType> From<RunArray<R>> for ArrayData {
    fn from(array: RunArray<R>) -> Self {
        let len = array.run_ends.len();
        let offset = array.run_ends.offset();
        let run_ends = ArrayDataBuilder::new(R::DATA_TYPE)
            .len(array.run_ends.values().len())
            .buffers(vec![array.run_ends.into_inner().into_inner()]);
        let run_ends = unsafe { run_ends.build_unchecked() };
        let builder = ArrayDataBuilder::new(array.data_type)
            .len(len)
            .offset(offset)
            .child_data(vec![run_ends, array.values.to_data()]);
        unsafe { builder.build_unchecked() }
    }
}
impl<T: RunEndIndexType> Array for RunArray<T> {
    fn as_any(&self) -> &dyn Any {
        self
    }
    fn to_data(&self) -> ArrayData {
        self.clone().into()
    }
    fn into_data(self) -> ArrayData {
        self.into()
    }
    fn data_type(&self) -> &DataType {
        &self.data_type
    }
    fn slice(&self, offset: usize, length: usize) -> ArrayRef {
        Arc::new(self.slice(offset, length))
    }
    fn len(&self) -> usize {
        self.run_ends.len()
    }
    fn is_empty(&self) -> bool {
        self.run_ends.is_empty()
    }
    fn offset(&self) -> usize {
        self.run_ends.offset()
    }
    fn nulls(&self) -> Option<&NullBuffer> {
        None
    }
    fn logical_nulls(&self) -> Option<NullBuffer> {
        let len = self.len();
        let nulls = self.values.logical_nulls()?;
        let mut out = BooleanBufferBuilder::new(len);
        let offset = self.run_ends.offset();
        let mut valid_start = 0;
        let mut last_end = 0;
        for (idx, end) in self.run_ends.values().iter().enumerate() {
            let end = end.as_usize();
            if end < offset {
                continue;
            }
            let end = (end - offset).min(len);
            if nulls.is_null(idx) {
                if valid_start < last_end {
                    out.append_n(last_end - valid_start, true);
                }
                out.append_n(end - last_end, false);
                valid_start = end;
            }
            last_end = end;
            if end == len {
                break;
            }
        }
        if valid_start < len {
            out.append_n(len - valid_start, true)
        }
        assert_eq!(out.len(), len);
        Some(out.finish().into())
    }
    fn is_nullable(&self) -> bool {
        !self.is_empty() && self.values.is_nullable()
    }
    fn get_buffer_memory_size(&self) -> usize {
        self.run_ends.inner().inner().capacity() + self.values.get_buffer_memory_size()
    }
    fn get_array_memory_size(&self) -> usize {
        std::mem::size_of::<Self>()
            + self.run_ends.inner().inner().capacity()
            + self.values.get_array_memory_size()
    }
}
impl<R: RunEndIndexType> std::fmt::Debug for RunArray<R> {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        writeln!(
            f,
            "RunArray {{run_ends: {:?}, values: {:?}}}",
            self.run_ends.values(),
            self.values
        )
    }
}
impl<'a, T: RunEndIndexType> FromIterator<Option<&'a str>> for RunArray<T> {
    fn from_iter<I: IntoIterator<Item = Option<&'a str>>>(iter: I) -> Self {
        let it = iter.into_iter();
        let (lower, _) = it.size_hint();
        let mut builder = StringRunBuilder::with_capacity(lower, 256);
        it.for_each(|i| {
            builder.append_option(i);
        });
        builder.finish()
    }
}
impl<'a, T: RunEndIndexType> FromIterator<&'a str> for RunArray<T> {
    fn from_iter<I: IntoIterator<Item = &'a str>>(iter: I) -> Self {
        let it = iter.into_iter();
        let (lower, _) = it.size_hint();
        let mut builder = StringRunBuilder::with_capacity(lower, 256);
        it.for_each(|i| {
            builder.append_value(i);
        });
        builder.finish()
    }
}
pub type Int16RunArray = RunArray<Int16Type>;
pub type Int32RunArray = RunArray<Int32Type>;
pub type Int64RunArray = RunArray<Int64Type>;
pub struct TypedRunArray<'a, R: RunEndIndexType, V> {
    run_array: &'a RunArray<R>,
    values: &'a V,
}
impl<R: RunEndIndexType, V> Clone for TypedRunArray<'_, R, V> {
    fn clone(&self) -> Self {
        *self
    }
}
impl<R: RunEndIndexType, V> Copy for TypedRunArray<'_, R, V> {}
impl<R: RunEndIndexType, V> std::fmt::Debug for TypedRunArray<'_, R, V> {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        writeln!(f, "TypedRunArray({:?})", self.run_array)
    }
}
impl<'a, R: RunEndIndexType, V> TypedRunArray<'a, R, V> {
    pub fn run_ends(&self) -> &'a RunEndBuffer<R::Native> {
        self.run_array.run_ends()
    }
    pub fn values(&self) -> &'a V {
        self.values
    }
    pub fn run_array(&self) -> &'a RunArray<R> {
        self.run_array
    }
}
impl<R: RunEndIndexType, V: Sync> Array for TypedRunArray<'_, R, V> {
    fn as_any(&self) -> &dyn Any {
        self.run_array
    }
    fn to_data(&self) -> ArrayData {
        self.run_array.to_data()
    }
    fn into_data(self) -> ArrayData {
        self.run_array.into_data()
    }
    fn data_type(&self) -> &DataType {
        self.run_array.data_type()
    }
    fn slice(&self, offset: usize, length: usize) -> ArrayRef {
        Arc::new(self.run_array.slice(offset, length))
    }
    fn len(&self) -> usize {
        self.run_array.len()
    }
    fn is_empty(&self) -> bool {
        self.run_array.is_empty()
    }
    fn offset(&self) -> usize {
        self.run_array.offset()
    }
    fn nulls(&self) -> Option<&NullBuffer> {
        self.run_array.nulls()
    }
    fn logical_nulls(&self) -> Option<NullBuffer> {
        self.run_array.logical_nulls()
    }
    fn logical_null_count(&self) -> usize {
        self.run_array.logical_null_count()
    }
    fn is_nullable(&self) -> bool {
        self.run_array.is_nullable()
    }
    fn get_buffer_memory_size(&self) -> usize {
        self.run_array.get_buffer_memory_size()
    }
    fn get_array_memory_size(&self) -> usize {
        self.run_array.get_array_memory_size()
    }
}
impl<'a, R, V> ArrayAccessor for TypedRunArray<'a, R, V>
where
    R: RunEndIndexType,
    V: Sync + Send,
    &'a V: ArrayAccessor,
    <&'a V as ArrayAccessor>::Item: Default,
{
    type Item = <&'a V as ArrayAccessor>::Item;
    fn value(&self, logical_index: usize) -> Self::Item {
        assert!(
            logical_index < self.len(),
            "Trying to access an element at index {} from a TypedRunArray of length {}",
            logical_index,
            self.len()
        );
        unsafe { self.value_unchecked(logical_index) }
    }
    unsafe fn value_unchecked(&self, logical_index: usize) -> Self::Item {
        let physical_index = self.run_array.get_physical_index(logical_index);
        self.values().value_unchecked(physical_index)
    }
}
impl<'a, R, V> IntoIterator for TypedRunArray<'a, R, V>
where
    R: RunEndIndexType,
    V: Sync + Send,
    &'a V: ArrayAccessor,
    <&'a V as ArrayAccessor>::Item: Default,
{
    type Item = Option<<&'a V as ArrayAccessor>::Item>;
    type IntoIter = RunArrayIter<'a, R, V>;
    fn into_iter(self) -> Self::IntoIter {
        RunArrayIter::new(self)
    }
}
#[cfg(test)]
mod tests {
    use rand::seq::SliceRandom;
    use rand::thread_rng;
    use rand::Rng;
    use super::*;
    use crate::builder::PrimitiveRunBuilder;
    use crate::cast::AsArray;
    use crate::types::{Int8Type, UInt32Type};
    use crate::{Int32Array, StringArray};
    fn build_input_array(size: usize) -> Vec<Option<i32>> {
        let mut seed: Vec<Option<i32>> = vec![
            None,
            None,
            None,
            Some(1),
            Some(2),
            Some(3),
            Some(4),
            Some(5),
            Some(6),
            Some(7),
            Some(8),
            Some(9),
        ];
        let mut result: Vec<Option<i32>> = Vec::with_capacity(size);
        let mut ix = 0;
        let mut rng = thread_rng();
        let max_run_length = 8_usize.min(1_usize.max(size / 2));
        while result.len() < size {
            if ix == 0 {
                seed.shuffle(&mut rng);
            }
            let num = max_run_length.min(rand::thread_rng().gen_range(1..=max_run_length));
            for _ in 0..num {
                result.push(seed[ix]);
            }
            ix += 1;
            if ix == seed.len() {
                ix = 0
            }
        }
        result.resize(size, None);
        result
    }
    fn compare_logical_and_physical_indices(
        logical_indices: &[u32],
        logical_array: &[Option<i32>],
        physical_indices: &[usize],
        physical_array: &PrimitiveArray<Int32Type>,
    ) {
        assert_eq!(logical_indices.len(), physical_indices.len());
        logical_indices
            .iter()
            .map(|f| f.as_usize())
            .zip(physical_indices.iter())
            .for_each(|(logical_ix, physical_ix)| {
                let expected = logical_array[logical_ix];
                match expected {
                    Some(val) => {
                        assert!(physical_array.is_valid(*physical_ix));
                        let actual = physical_array.value(*physical_ix);
                        assert_eq!(val, actual);
                    }
                    None => {
                        assert!(physical_array.is_null(*physical_ix))
                    }
                };
            });
    }
    #[test]
    fn test_run_array() {
        let value_data =
            PrimitiveArray::<Int8Type>::from_iter_values([10_i8, 11, 12, 13, 14, 15, 16, 17]);
        let run_ends_values = [4_i16, 6, 7, 9, 13, 18, 20, 22];
        let run_ends_data =
            PrimitiveArray::<Int16Type>::from_iter_values(run_ends_values.iter().copied());
        let ree_array = RunArray::<Int16Type>::try_new(&run_ends_data, &value_data).unwrap();
        assert_eq!(ree_array.len(), 22);
        assert_eq!(ree_array.null_count(), 0);
        let values = ree_array.values();
        assert_eq!(value_data.into_data(), values.to_data());
        assert_eq!(&DataType::Int8, values.data_type());
        let run_ends = ree_array.run_ends();
        assert_eq!(run_ends.values(), &run_ends_values);
    }
    #[test]
    fn test_run_array_fmt_debug() {
        let mut builder = PrimitiveRunBuilder::<Int16Type, UInt32Type>::with_capacity(3);
        builder.append_value(12345678);
        builder.append_null();
        builder.append_value(22345678);
        let array = builder.finish();
        assert_eq!(
            "RunArray {run_ends: [1, 2, 3], values: PrimitiveArray<UInt32>\n[\n  12345678,\n  null,\n  22345678,\n]}\n",
            format!("{array:?}")
        );
        let mut builder = PrimitiveRunBuilder::<Int16Type, UInt32Type>::with_capacity(20);
        for _ in 0..20 {
            builder.append_value(1);
        }
        let array = builder.finish();
        assert_eq!(array.len(), 20);
        assert_eq!(array.null_count(), 0);
        assert_eq!(array.logical_null_count(), 0);
        assert_eq!(
            "RunArray {run_ends: [20], values: PrimitiveArray<UInt32>\n[\n  1,\n]}\n",
            format!("{array:?}")
        );
    }
    #[test]
    fn test_run_array_from_iter() {
        let test = vec!["a", "a", "b", "c"];
        let array: RunArray<Int16Type> = test
            .iter()
            .map(|&x| if x == "b" { None } else { Some(x) })
            .collect();
        assert_eq!(
            "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n  \"a\",\n  null,\n  \"c\",\n]}\n",
            format!("{array:?}")
        );
        assert_eq!(array.len(), 4);
        assert_eq!(array.null_count(), 0);
        assert_eq!(array.logical_null_count(), 1);
        let array: RunArray<Int16Type> = test.into_iter().collect();
        assert_eq!(
            "RunArray {run_ends: [2, 3, 4], values: StringArray\n[\n  \"a\",\n  \"b\",\n  \"c\",\n]}\n",
            format!("{array:?}")
        );
    }
    #[test]
    fn test_run_array_run_ends_as_primitive_array() {
        let test = vec!["a", "b", "c", "a"];
        let array: RunArray<Int16Type> = test.into_iter().collect();
        assert_eq!(array.len(), 4);
        assert_eq!(array.null_count(), 0);
        assert_eq!(array.logical_null_count(), 0);
        let run_ends = array.run_ends();
        assert_eq!(&[1, 2, 3, 4], run_ends.values());
    }
    #[test]
    fn test_run_array_as_primitive_array_with_null() {
        let test = vec![Some("a"), None, Some("b"), None, None, Some("a")];
        let array: RunArray<Int32Type> = test.into_iter().collect();
        assert_eq!(array.len(), 6);
        assert_eq!(array.null_count(), 0);
        assert_eq!(array.logical_null_count(), 3);
        let run_ends = array.run_ends();
        assert_eq!(&[1, 2, 3, 5, 6], run_ends.values());
        let values_data = array.values();
        assert_eq!(2, values_data.null_count());
        assert_eq!(5, values_data.len());
    }
    #[test]
    fn test_run_array_all_nulls() {
        let test = vec![None, None, None];
        let array: RunArray<Int32Type> = test.into_iter().collect();
        assert_eq!(array.len(), 3);
        assert_eq!(array.null_count(), 0);
        assert_eq!(array.logical_null_count(), 3);
        let run_ends = array.run_ends();
        assert_eq!(3, run_ends.len());
        assert_eq!(&[3], run_ends.values());
        let values_data = array.values();
        assert_eq!(1, values_data.null_count());
    }
    #[test]
    fn test_run_array_try_new() {
        let values: StringArray = [Some("foo"), Some("bar"), None, Some("baz")]
            .into_iter()
            .collect();
        let run_ends: Int32Array = [Some(1), Some(2), Some(3), Some(4)].into_iter().collect();
        let array = RunArray::<Int32Type>::try_new(&run_ends, &values).unwrap();
        assert_eq!(array.values().data_type(), &DataType::Utf8);
        assert_eq!(array.null_count(), 0);
        assert_eq!(array.logical_null_count(), 1);
        assert_eq!(array.len(), 4);
        assert_eq!(array.values().null_count(), 1);
        assert_eq!(
            "RunArray {run_ends: [1, 2, 3, 4], values: StringArray\n[\n  \"foo\",\n  \"bar\",\n  null,\n  \"baz\",\n]}\n",
            format!("{array:?}")
        );
    }
    #[test]
    fn test_run_array_int16_type_definition() {
        let array: Int16RunArray = vec!["a", "a", "b", "c", "c"].into_iter().collect();
        let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "b", "c"]));
        assert_eq!(array.run_ends().values(), &[2, 3, 5]);
        assert_eq!(array.values(), &values);
    }
    #[test]
    fn test_run_array_empty_string() {
        let array: Int16RunArray = vec!["a", "a", "", "", "c"].into_iter().collect();
        let values: Arc<dyn Array> = Arc::new(StringArray::from(vec!["a", "", "c"]));
        assert_eq!(array.run_ends().values(), &[2, 4, 5]);
        assert_eq!(array.values(), &values);
    }
    #[test]
    fn test_run_array_length_mismatch() {
        let values: StringArray = [Some("foo"), Some("bar"), None, Some("baz")]
            .into_iter()
            .collect();
        let run_ends: Int32Array = [Some(1), Some(2), Some(3)].into_iter().collect();
        let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
        let expected = ArrowError::InvalidArgumentError("The run_ends array length should be the same as values array length. Run_ends array length is 3, values array length is 4".to_string());
        assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
    }
    #[test]
    fn test_run_array_run_ends_with_null() {
        let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
            .into_iter()
            .collect();
        let run_ends: Int32Array = [Some(1), None, Some(3)].into_iter().collect();
        let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
        let expected = ArrowError::InvalidArgumentError(
            "Found null values in run_ends array. The run_ends array should not have null values."
                .to_string(),
        );
        assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
    }
    #[test]
    fn test_run_array_run_ends_with_zeroes() {
        let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
            .into_iter()
            .collect();
        let run_ends: Int32Array = [Some(0), Some(1), Some(3)].into_iter().collect();
        let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
        let expected = ArrowError::InvalidArgumentError("The values in run_ends array should be strictly positive. Found value 0 at index 0 that does not match the criteria.".to_string());
        assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
    }
    #[test]
    fn test_run_array_run_ends_non_increasing() {
        let values: StringArray = [Some("foo"), Some("bar"), Some("baz")]
            .into_iter()
            .collect();
        let run_ends: Int32Array = [Some(1), Some(4), Some(4)].into_iter().collect();
        let actual = RunArray::<Int32Type>::try_new(&run_ends, &values);
        let expected = ArrowError::InvalidArgumentError("The values in run_ends array should be strictly increasing. Found value 4 at index 2 with previous value 4 that does not match the criteria.".to_string());
        assert_eq!(expected.to_string(), actual.err().unwrap().to_string());
    }
    #[test]
    #[should_panic(expected = "Incorrect run ends type")]
    fn test_run_array_run_ends_data_type_mismatch() {
        let a = RunArray::<Int32Type>::from_iter(["32"]);
        let _ = RunArray::<Int64Type>::from(a.into_data());
    }
    #[test]
    fn test_ree_array_accessor() {
        let input_array = build_input_array(256);
        let mut builder =
            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
        builder.extend(input_array.iter().copied());
        let run_array = builder.finish();
        let typed = run_array.downcast::<PrimitiveArray<Int32Type>>().unwrap();
        for (i, inp_val) in input_array.iter().enumerate() {
            if let Some(val) = inp_val {
                let actual = typed.value(i);
                assert_eq!(*val, actual)
            } else {
                let physical_ix = run_array.get_physical_index(i);
                assert!(typed.values().is_null(physical_ix));
            };
        }
    }
    #[test]
    #[cfg_attr(miri, ignore)] fn test_get_physical_indices() {
        for logical_len in (0..250).step_by(10) {
            let input_array = build_input_array(logical_len);
            let mut builder = PrimitiveRunBuilder::<Int32Type, Int32Type>::new();
            builder.extend(input_array.clone().into_iter());
            let run_array = builder.finish();
            let physical_values_array = run_array.values().as_primitive::<Int32Type>();
            let mut logical_indices: Vec<u32> = (0_u32..(logical_len as u32)).collect();
            logical_indices.append(&mut logical_indices.clone());
            let mut rng = thread_rng();
            logical_indices.shuffle(&mut rng);
            let physical_indices = run_array.get_physical_indices(&logical_indices).unwrap();
            assert_eq!(logical_indices.len(), physical_indices.len());
            compare_logical_and_physical_indices(
                &logical_indices,
                &input_array,
                &physical_indices,
                physical_values_array,
            );
        }
    }
    #[test]
    #[cfg_attr(miri, ignore)] fn test_get_physical_indices_sliced() {
        let total_len = 80;
        let input_array = build_input_array(total_len);
        let mut builder =
            PrimitiveRunBuilder::<Int16Type, Int32Type>::with_capacity(input_array.len());
        builder.extend(input_array.iter().copied());
        let run_array = builder.finish();
        let physical_values_array = run_array.values().as_primitive::<Int32Type>();
        for slice_len in 1..=total_len {
            let mut logical_indices: Vec<u32> = (0_u32..(slice_len as u32)).collect();
            logical_indices.append(&mut logical_indices.clone());
            let mut rng = thread_rng();
            logical_indices.shuffle(&mut rng);
            let sliced_input_array = &input_array[0..slice_len];
            let sliced_run_array: RunArray<Int16Type> =
                run_array.slice(0, slice_len).into_data().into();
            let physical_indices = sliced_run_array
                .get_physical_indices(&logical_indices)
                .unwrap();
            compare_logical_and_physical_indices(
                &logical_indices,
                sliced_input_array,
                &physical_indices,
                physical_values_array,
            );
            let sliced_input_array = &input_array[total_len - slice_len..total_len];
            let sliced_run_array: RunArray<Int16Type> = run_array
                .slice(total_len - slice_len, slice_len)
                .into_data()
                .into();
            let physical_indices = sliced_run_array
                .get_physical_indices(&logical_indices)
                .unwrap();
            compare_logical_and_physical_indices(
                &logical_indices,
                sliced_input_array,
                &physical_indices,
                physical_values_array,
            );
        }
    }
    #[test]
    fn test_logical_nulls() {
        let run = Int32Array::from(vec![3, 6, 9, 12]);
        let values = Int32Array::from(vec![Some(0), None, Some(1), None]);
        let array = RunArray::try_new(&run, &values).unwrap();
        let expected = [
            true, true, true, false, false, false, true, true, true, false, false, false,
        ];
        let n = array.logical_nulls().unwrap();
        assert_eq!(n.null_count(), 6);
        let slices = [(0, 12), (0, 2), (2, 5), (3, 0), (3, 3), (3, 4), (4, 8)];
        for (offset, length) in slices {
            let a = array.slice(offset, length);
            let n = a.logical_nulls().unwrap();
            let n = n.into_iter().collect::<Vec<_>>();
            assert_eq!(&n, &expected[offset..offset + length], "{offset} {length}");
        }
    }
}