use crate::{make_array, Array, ArrayRef};
use arrow_buffer::buffer::NullBuffer;
use arrow_buffer::{Buffer, ScalarBuffer};
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, Field, UnionFields, UnionMode};
use std::any::Any;
use std::sync::Arc;
#[derive(Clone)]
pub struct UnionArray {
    data: ArrayData,
    type_ids: ScalarBuffer<i8>,
    offsets: Option<ScalarBuffer<i32>>,
    boxed_fields: Vec<Option<ArrayRef>>,
}
impl UnionArray {
    pub unsafe fn new_unchecked(
        field_type_ids: &[i8],
        type_ids: Buffer,
        value_offsets: Option<Buffer>,
        child_arrays: Vec<(Field, ArrayRef)>,
    ) -> Self {
        let (fields, field_values): (Vec<_>, Vec<_>) = child_arrays.into_iter().unzip();
        let len = type_ids.len();
        let mode = if value_offsets.is_some() {
            UnionMode::Dense
        } else {
            UnionMode::Sparse
        };
        let builder = ArrayData::builder(DataType::Union(
            UnionFields::new(field_type_ids.iter().copied(), fields),
            mode,
        ))
        .add_buffer(type_ids)
        .child_data(field_values.into_iter().map(|a| a.into_data()).collect())
        .len(len);
        let data = match value_offsets {
            Some(b) => builder.add_buffer(b).build_unchecked(),
            None => builder.build_unchecked(),
        };
        Self::from(data)
    }
    pub fn try_new(
        field_type_ids: &[i8],
        type_ids: Buffer,
        value_offsets: Option<Buffer>,
        child_arrays: Vec<(Field, ArrayRef)>,
    ) -> Result<Self, ArrowError> {
        if let Some(b) = &value_offsets {
            if ((type_ids.len()) * 4) != b.len() {
                return Err(ArrowError::InvalidArgumentError(
                    "Type Ids and Offsets represent a different number of array slots."
                        .to_string(),
                ));
            }
        }
        let type_id_slice: &[i8] = type_ids.typed_data();
        let invalid_type_ids = type_id_slice
            .iter()
            .filter(|i| *i < &0)
            .collect::<Vec<&i8>>();
        if !invalid_type_ids.is_empty() {
            return Err(ArrowError::InvalidArgumentError(format!(
                "Type Ids must be positive and cannot be greater than the number of \
                child arrays, found:\n{invalid_type_ids:?}"
            )));
        }
        if let Some(offset_buffer) = &value_offsets {
            let max_len = type_ids.len() as i32;
            let offsets_slice: &[i32] = offset_buffer.typed_data();
            let invalid_offsets = offsets_slice
                .iter()
                .filter(|i| *i < &0 || *i > &max_len)
                .collect::<Vec<&i32>>();
            if !invalid_offsets.is_empty() {
                return Err(ArrowError::InvalidArgumentError(format!(
                    "Offsets must be positive and within the length of the Array, \
                    found:\n{invalid_offsets:?}"
                )));
            }
        }
        let new_self = unsafe {
            Self::new_unchecked(field_type_ids, type_ids, value_offsets, child_arrays)
        };
        new_self.to_data().validate()?;
        Ok(new_self)
    }
    pub fn child(&self, type_id: i8) -> &ArrayRef {
        let boxed = &self.boxed_fields[type_id as usize];
        boxed.as_ref().expect("invalid type id")
    }
    pub fn type_id(&self, index: usize) -> i8 {
        self.type_ids[index]
    }
    pub fn type_ids(&self) -> &ScalarBuffer<i8> {
        &self.type_ids
    }
    pub fn offsets(&self) -> Option<&ScalarBuffer<i32>> {
        self.offsets.as_ref()
    }
    pub fn value_offset(&self, index: usize) -> usize {
        assert!(index < self.len());
        match &self.offsets {
            Some(offsets) => offsets[index] as usize,
            None => self.offset() + index,
        }
    }
    pub fn value(&self, i: usize) -> ArrayRef {
        let type_id = self.type_id(i);
        let value_offset = self.value_offset(i);
        let child = self.child(type_id);
        child.slice(value_offset, 1)
    }
    pub fn type_names(&self) -> Vec<&str> {
        match self.data.data_type() {
            DataType::Union(fields, _) => fields
                .iter()
                .map(|(_, f)| f.name().as_str())
                .collect::<Vec<&str>>(),
            _ => unreachable!("Union array's data type is not a union!"),
        }
    }
    fn is_dense(&self) -> bool {
        match self.data.data_type() {
            DataType::Union(_, mode) => mode == &UnionMode::Dense,
            _ => unreachable!("Union array's data type is not a union!"),
        }
    }
    pub fn slice(&self, offset: usize, length: usize) -> Self {
        self.data.slice(offset, length).into()
    }
}
impl From<ArrayData> for UnionArray {
    fn from(data: ArrayData) -> Self {
        let (fields, mode) = match data.data_type() {
            DataType::Union(fields, mode) => (fields, *mode),
            d => panic!("UnionArray expected ArrayData with type Union got {d}"),
        };
        let (type_ids, offsets) = match mode {
            UnionMode::Sparse => (
                ScalarBuffer::new(data.buffers()[0].clone(), data.offset(), data.len()),
                None,
            ),
            UnionMode::Dense => (
                ScalarBuffer::new(data.buffers()[0].clone(), data.offset(), data.len()),
                Some(ScalarBuffer::new(
                    data.buffers()[1].clone(),
                    data.offset(),
                    data.len(),
                )),
            ),
        };
        let max_id = fields.iter().map(|(i, _)| i).max().unwrap_or_default() as usize;
        let mut boxed_fields = vec![None; max_id + 1];
        for (cd, (field_id, _)) in data.child_data().iter().zip(fields.iter()) {
            boxed_fields[field_id as usize] = Some(make_array(cd.clone()));
        }
        Self {
            data,
            type_ids,
            offsets,
            boxed_fields,
        }
    }
}
impl From<UnionArray> for ArrayData {
    fn from(array: UnionArray) -> Self {
        array.data
    }
}
impl Array for UnionArray {
    fn as_any(&self) -> &dyn Any {
        self
    }
    fn data(&self) -> &ArrayData {
        &self.data
    }
    fn to_data(&self) -> ArrayData {
        self.data.clone()
    }
    fn into_data(self) -> ArrayData {
        self.into()
    }
    fn slice(&self, offset: usize, length: usize) -> ArrayRef {
        Arc::new(self.slice(offset, length))
    }
    fn nulls(&self) -> Option<&NullBuffer> {
        None
    }
    fn is_null(&self, _index: usize) -> bool {
        false
    }
    fn is_valid(&self, _index: usize) -> bool {
        true
    }
    fn null_count(&self) -> usize {
        0
    }
}
impl std::fmt::Debug for UnionArray {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        let header = if self.is_dense() {
            "UnionArray(Dense)\n["
        } else {
            "UnionArray(Sparse)\n["
        };
        writeln!(f, "{header}")?;
        writeln!(f, "-- type id buffer:")?;
        writeln!(f, "{:?}", self.type_ids)?;
        if let Some(offsets) = &self.offsets {
            writeln!(f, "-- offsets buffer:")?;
            writeln!(f, "{:?}", offsets)?;
        }
        let fields = match self.data_type() {
            DataType::Union(fields, _) => fields,
            _ => unreachable!(),
        };
        for (type_id, field) in fields.iter() {
            let child = self.child(type_id);
            writeln!(
                f,
                "-- child {}: \"{}\" ({:?})",
                type_id,
                field.name(),
                field.data_type()
            )?;
            std::fmt::Debug::fmt(child, f)?;
            writeln!(f)?;
        }
        writeln!(f, "]")
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use crate::builder::UnionBuilder;
    use crate::cast::AsArray;
    use crate::types::{Float32Type, Float64Type, Int32Type, Int64Type};
    use crate::RecordBatch;
    use crate::{Float64Array, Int32Array, Int64Array, StringArray};
    use arrow_schema::Schema;
    use std::sync::Arc;
    #[test]
    fn test_dense_i32() {
        let mut builder = UnionBuilder::new_dense();
        builder.append::<Int32Type>("a", 1).unwrap();
        builder.append::<Int32Type>("b", 2).unwrap();
        builder.append::<Int32Type>("c", 3).unwrap();
        builder.append::<Int32Type>("a", 4).unwrap();
        builder.append::<Int32Type>("c", 5).unwrap();
        builder.append::<Int32Type>("a", 6).unwrap();
        builder.append::<Int32Type>("b", 7).unwrap();
        let union = builder.build().unwrap();
        let expected_type_ids = vec![0_i8, 1, 2, 0, 2, 0, 1];
        let expected_offsets = vec![0_i32, 0, 0, 1, 1, 2, 1];
        let expected_array_values = [1_i32, 2, 3, 4, 5, 6, 7];
        assert_eq!(*union.type_ids(), expected_type_ids);
        for (i, id) in expected_type_ids.iter().enumerate() {
            assert_eq!(id, &union.type_id(i));
        }
        assert_eq!(*union.offsets().unwrap(), expected_offsets);
        for (i, id) in expected_offsets.iter().enumerate() {
            assert_eq!(union.value_offset(i), *id as usize);
        }
        assert_eq!(
            *union.child(0).as_primitive::<Int32Type>().values(),
            [1_i32, 4, 6]
        );
        assert_eq!(
            *union.child(1).as_primitive::<Int32Type>().values(),
            [2_i32, 7]
        );
        assert_eq!(
            *union.child(2).as_primitive::<Int32Type>().values(),
            [3_i32, 5]
        );
        assert_eq!(expected_array_values.len(), union.len());
        for (i, expected_value) in expected_array_values.iter().enumerate() {
            assert!(!union.is_null(i));
            let slot = union.value(i);
            let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
            assert_eq!(slot.len(), 1);
            let value = slot.value(0);
            assert_eq!(expected_value, &value);
        }
    }
    #[test]
    #[cfg_attr(miri, ignore)]
    fn test_dense_i32_large() {
        let mut builder = UnionBuilder::new_dense();
        let expected_type_ids = vec![0_i8; 1024];
        let expected_offsets: Vec<_> = (0..1024).collect();
        let expected_array_values: Vec<_> = (1..=1024).collect();
        expected_array_values
            .iter()
            .for_each(|v| builder.append::<Int32Type>("a", *v).unwrap());
        let union = builder.build().unwrap();
        assert_eq!(*union.type_ids(), expected_type_ids);
        for (i, id) in expected_type_ids.iter().enumerate() {
            assert_eq!(id, &union.type_id(i));
        }
        assert_eq!(*union.offsets().unwrap(), expected_offsets);
        for (i, id) in expected_offsets.iter().enumerate() {
            assert_eq!(union.value_offset(i), *id as usize);
        }
        for (i, expected_value) in expected_array_values.iter().enumerate() {
            assert!(!union.is_null(i));
            let slot = union.value(i);
            let slot = slot.as_primitive::<Int32Type>();
            assert_eq!(slot.len(), 1);
            let value = slot.value(0);
            assert_eq!(expected_value, &value);
        }
    }
    #[test]
    fn test_dense_mixed() {
        let mut builder = UnionBuilder::new_dense();
        builder.append::<Int32Type>("a", 1).unwrap();
        builder.append::<Int64Type>("c", 3).unwrap();
        builder.append::<Int32Type>("a", 4).unwrap();
        builder.append::<Int64Type>("c", 5).unwrap();
        builder.append::<Int32Type>("a", 6).unwrap();
        let union = builder.build().unwrap();
        assert_eq!(5, union.len());
        for i in 0..union.len() {
            let slot = union.value(i);
            assert!(!union.is_null(i));
            match i {
                0 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(1_i32, value);
                }
                1 => {
                    let slot = slot.as_any().downcast_ref::<Int64Array>().unwrap();
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(3_i64, value);
                }
                2 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(4_i32, value);
                }
                3 => {
                    let slot = slot.as_any().downcast_ref::<Int64Array>().unwrap();
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(5_i64, value);
                }
                4 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(6_i32, value);
                }
                _ => unreachable!(),
            }
        }
    }
    #[test]
    fn test_dense_mixed_with_nulls() {
        let mut builder = UnionBuilder::new_dense();
        builder.append::<Int32Type>("a", 1).unwrap();
        builder.append::<Int64Type>("c", 3).unwrap();
        builder.append::<Int32Type>("a", 10).unwrap();
        builder.append_null::<Int32Type>("a").unwrap();
        builder.append::<Int32Type>("a", 6).unwrap();
        let union = builder.build().unwrap();
        assert_eq!(5, union.len());
        for i in 0..union.len() {
            let slot = union.value(i);
            match i {
                0 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert!(!slot.is_null(0));
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(1_i32, value);
                }
                1 => {
                    let slot = slot.as_any().downcast_ref::<Int64Array>().unwrap();
                    assert!(!slot.is_null(0));
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(3_i64, value);
                }
                2 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert!(!slot.is_null(0));
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(10_i32, value);
                }
                3 => assert!(slot.is_null(0)),
                4 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert!(!slot.is_null(0));
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(6_i32, value);
                }
                _ => unreachable!(),
            }
        }
    }
    #[test]
    fn test_dense_mixed_with_nulls_and_offset() {
        let mut builder = UnionBuilder::new_dense();
        builder.append::<Int32Type>("a", 1).unwrap();
        builder.append::<Int64Type>("c", 3).unwrap();
        builder.append::<Int32Type>("a", 10).unwrap();
        builder.append_null::<Int32Type>("a").unwrap();
        builder.append::<Int32Type>("a", 6).unwrap();
        let union = builder.build().unwrap();
        let slice = union.slice(2, 3);
        let new_union = slice.as_any().downcast_ref::<UnionArray>().unwrap();
        assert_eq!(3, new_union.len());
        for i in 0..new_union.len() {
            let slot = new_union.value(i);
            match i {
                0 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert!(!slot.is_null(0));
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(10_i32, value);
                }
                1 => assert!(slot.is_null(0)),
                2 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert!(!slot.is_null(0));
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(6_i32, value);
                }
                _ => unreachable!(),
            }
        }
    }
    #[test]
    fn test_dense_mixed_with_str() {
        let string_array = StringArray::from(vec!["foo", "bar", "baz"]);
        let int_array = Int32Array::from(vec![5, 6]);
        let float_array = Float64Array::from(vec![10.0]);
        let type_ids = [1_i8, 0, 0, 2, 0, 1];
        let offsets = [0_i32, 0, 1, 0, 2, 1];
        let type_id_buffer = Buffer::from_slice_ref(type_ids);
        let value_offsets_buffer = Buffer::from_slice_ref(offsets);
        let children: Vec<(Field, Arc<dyn Array>)> = vec![
            (
                Field::new("A", DataType::Utf8, false),
                Arc::new(string_array),
            ),
            (Field::new("B", DataType::Int32, false), Arc::new(int_array)),
            (
                Field::new("C", DataType::Float64, false),
                Arc::new(float_array),
            ),
        ];
        let array = UnionArray::try_new(
            &[0, 1, 2],
            type_id_buffer,
            Some(value_offsets_buffer),
            children,
        )
        .unwrap();
        assert_eq!(*array.type_ids(), type_ids);
        for (i, id) in type_ids.iter().enumerate() {
            assert_eq!(id, &array.type_id(i));
        }
        assert_eq!(*array.offsets().unwrap(), offsets);
        for (i, id) in offsets.iter().enumerate() {
            assert_eq!(*id as usize, array.value_offset(i));
        }
        assert_eq!(6, array.len());
        let slot = array.value(0);
        let value = slot.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
        assert_eq!(5, value);
        let slot = array.value(1);
        let value = slot
            .as_any()
            .downcast_ref::<StringArray>()
            .unwrap()
            .value(0);
        assert_eq!("foo", value);
        let slot = array.value(2);
        let value = slot
            .as_any()
            .downcast_ref::<StringArray>()
            .unwrap()
            .value(0);
        assert_eq!("bar", value);
        let slot = array.value(3);
        let value = slot
            .as_any()
            .downcast_ref::<Float64Array>()
            .unwrap()
            .value(0);
        assert_eq!(10.0, value);
        let slot = array.value(4);
        let value = slot
            .as_any()
            .downcast_ref::<StringArray>()
            .unwrap()
            .value(0);
        assert_eq!("baz", value);
        let slot = array.value(5);
        let value = slot.as_any().downcast_ref::<Int32Array>().unwrap().value(0);
        assert_eq!(6, value);
    }
    #[test]
    fn test_sparse_i32() {
        let mut builder = UnionBuilder::new_sparse();
        builder.append::<Int32Type>("a", 1).unwrap();
        builder.append::<Int32Type>("b", 2).unwrap();
        builder.append::<Int32Type>("c", 3).unwrap();
        builder.append::<Int32Type>("a", 4).unwrap();
        builder.append::<Int32Type>("c", 5).unwrap();
        builder.append::<Int32Type>("a", 6).unwrap();
        builder.append::<Int32Type>("b", 7).unwrap();
        let union = builder.build().unwrap();
        let expected_type_ids = vec![0_i8, 1, 2, 0, 2, 0, 1];
        let expected_array_values = [1_i32, 2, 3, 4, 5, 6, 7];
        assert_eq!(*union.type_ids(), expected_type_ids);
        for (i, id) in expected_type_ids.iter().enumerate() {
            assert_eq!(id, &union.type_id(i));
        }
        assert!(union.offsets().is_none());
        assert_eq!(
            *union.child(0).as_primitive::<Int32Type>().values(),
            [1_i32, 0, 0, 4, 0, 6, 0],
        );
        assert_eq!(
            *union.child(1).as_primitive::<Int32Type>().values(),
            [0_i32, 2_i32, 0, 0, 0, 0, 7]
        );
        assert_eq!(
            *union.child(2).as_primitive::<Int32Type>().values(),
            [0_i32, 0, 3_i32, 0, 5, 0, 0]
        );
        assert_eq!(expected_array_values.len(), union.len());
        for (i, expected_value) in expected_array_values.iter().enumerate() {
            assert!(!union.is_null(i));
            let slot = union.value(i);
            let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
            assert_eq!(slot.len(), 1);
            let value = slot.value(0);
            assert_eq!(expected_value, &value);
        }
    }
    #[test]
    fn test_sparse_mixed() {
        let mut builder = UnionBuilder::new_sparse();
        builder.append::<Int32Type>("a", 1).unwrap();
        builder.append::<Float64Type>("c", 3.0).unwrap();
        builder.append::<Int32Type>("a", 4).unwrap();
        builder.append::<Float64Type>("c", 5.0).unwrap();
        builder.append::<Int32Type>("a", 6).unwrap();
        let union = builder.build().unwrap();
        let expected_type_ids = vec![0_i8, 1, 0, 1, 0];
        assert_eq!(*union.type_ids(), expected_type_ids);
        for (i, id) in expected_type_ids.iter().enumerate() {
            assert_eq!(id, &union.type_id(i));
        }
        assert!(union.offsets().is_none());
        for i in 0..union.len() {
            let slot = union.value(i);
            assert!(!union.is_null(i));
            match i {
                0 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(1_i32, value);
                }
                1 => {
                    let slot = slot.as_any().downcast_ref::<Float64Array>().unwrap();
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(value, 3_f64);
                }
                2 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(4_i32, value);
                }
                3 => {
                    let slot = slot.as_any().downcast_ref::<Float64Array>().unwrap();
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(5_f64, value);
                }
                4 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(6_i32, value);
                }
                _ => unreachable!(),
            }
        }
    }
    #[test]
    fn test_sparse_mixed_with_nulls() {
        let mut builder = UnionBuilder::new_sparse();
        builder.append::<Int32Type>("a", 1).unwrap();
        builder.append_null::<Int32Type>("a").unwrap();
        builder.append::<Float64Type>("c", 3.0).unwrap();
        builder.append::<Int32Type>("a", 4).unwrap();
        let union = builder.build().unwrap();
        let expected_type_ids = vec![0_i8, 0, 1, 0];
        assert_eq!(*union.type_ids(), expected_type_ids);
        for (i, id) in expected_type_ids.iter().enumerate() {
            assert_eq!(id, &union.type_id(i));
        }
        assert!(union.offsets().is_none());
        for i in 0..union.len() {
            let slot = union.value(i);
            match i {
                0 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert!(!slot.is_null(0));
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(1_i32, value);
                }
                1 => assert!(slot.is_null(0)),
                2 => {
                    let slot = slot.as_any().downcast_ref::<Float64Array>().unwrap();
                    assert!(!slot.is_null(0));
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(value, 3_f64);
                }
                3 => {
                    let slot = slot.as_any().downcast_ref::<Int32Array>().unwrap();
                    assert!(!slot.is_null(0));
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(4_i32, value);
                }
                _ => unreachable!(),
            }
        }
    }
    #[test]
    fn test_sparse_mixed_with_nulls_and_offset() {
        let mut builder = UnionBuilder::new_sparse();
        builder.append::<Int32Type>("a", 1).unwrap();
        builder.append_null::<Int32Type>("a").unwrap();
        builder.append::<Float64Type>("c", 3.0).unwrap();
        builder.append_null::<Float64Type>("c").unwrap();
        builder.append::<Int32Type>("a", 4).unwrap();
        let union = builder.build().unwrap();
        let slice = union.slice(1, 4);
        let new_union = slice.as_any().downcast_ref::<UnionArray>().unwrap();
        assert_eq!(4, new_union.len());
        for i in 0..new_union.len() {
            let slot = new_union.value(i);
            match i {
                0 => assert!(slot.is_null(0)),
                1 => {
                    let slot = slot.as_primitive::<Float64Type>();
                    assert!(!slot.is_null(0));
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(value, 3_f64);
                }
                2 => assert!(slot.is_null(0)),
                3 => {
                    let slot = slot.as_primitive::<Int32Type>();
                    assert!(!slot.is_null(0));
                    assert_eq!(slot.len(), 1);
                    let value = slot.value(0);
                    assert_eq!(4_i32, value);
                }
                _ => unreachable!(),
            }
        }
    }
    fn test_union_validity(union_array: &UnionArray) {
        assert_eq!(union_array.null_count(), 0);
        for i in 0..union_array.len() {
            assert!(!union_array.is_null(i));
            assert!(union_array.is_valid(i));
        }
    }
    #[test]
    fn test_union_array_validity() {
        let mut builder = UnionBuilder::new_sparse();
        builder.append::<Int32Type>("a", 1).unwrap();
        builder.append_null::<Int32Type>("a").unwrap();
        builder.append::<Float64Type>("c", 3.0).unwrap();
        builder.append_null::<Float64Type>("c").unwrap();
        builder.append::<Int32Type>("a", 4).unwrap();
        let union = builder.build().unwrap();
        test_union_validity(&union);
        let mut builder = UnionBuilder::new_dense();
        builder.append::<Int32Type>("a", 1).unwrap();
        builder.append_null::<Int32Type>("a").unwrap();
        builder.append::<Float64Type>("c", 3.0).unwrap();
        builder.append_null::<Float64Type>("c").unwrap();
        builder.append::<Int32Type>("a", 4).unwrap();
        let union = builder.build().unwrap();
        test_union_validity(&union);
    }
    #[test]
    fn test_type_check() {
        let mut builder = UnionBuilder::new_sparse();
        builder.append::<Float32Type>("a", 1.0).unwrap();
        let err = builder.append::<Int32Type>("a", 1).unwrap_err().to_string();
        assert!(err.contains("Attempt to write col \"a\" with type Int32 doesn't match existing type Float32"), "{}", err);
    }
    #[test]
    fn slice_union_array() {
        fn create_union(mut builder: UnionBuilder) -> UnionArray {
            builder.append::<Int32Type>("a", 1).unwrap();
            builder.append_null::<Int32Type>("a").unwrap();
            builder.append::<Float64Type>("c", 3.0).unwrap();
            builder.append_null::<Float64Type>("c").unwrap();
            builder.append::<Int32Type>("a", 4).unwrap();
            builder.build().unwrap()
        }
        fn create_batch(union: UnionArray) -> RecordBatch {
            let schema = Schema::new(vec![Field::new(
                "struct_array",
                union.data_type().clone(),
                true,
            )]);
            RecordBatch::try_new(Arc::new(schema), vec![Arc::new(union)]).unwrap()
        }
        fn test_slice_union(record_batch_slice: RecordBatch) {
            let union_slice = record_batch_slice
                .column(0)
                .as_any()
                .downcast_ref::<UnionArray>()
                .unwrap();
            assert_eq!(union_slice.type_id(0), 0);
            assert_eq!(union_slice.type_id(1), 1);
            assert_eq!(union_slice.type_id(2), 1);
            let slot = union_slice.value(0);
            let array = slot.as_primitive::<Int32Type>();
            assert_eq!(array.len(), 1);
            assert!(array.is_null(0));
            let slot = union_slice.value(1);
            let array = slot.as_primitive::<Float64Type>();
            assert_eq!(array.len(), 1);
            assert!(array.is_valid(0));
            assert_eq!(array.value(0), 3.0);
            let slot = union_slice.value(2);
            let array = slot.as_primitive::<Float64Type>();
            assert_eq!(array.len(), 1);
            assert!(array.is_null(0));
        }
        let builder = UnionBuilder::new_sparse();
        let record_batch = create_batch(create_union(builder));
        let record_batch_slice = record_batch.slice(1, 3);
        test_slice_union(record_batch_slice);
        let builder = UnionBuilder::new_dense();
        let record_batch = create_batch(create_union(builder));
        let record_batch_slice = record_batch.slice(1, 3);
        test_slice_union(record_batch_slice);
    }
    #[test]
    fn test_custom_type_ids() {
        let data_type = DataType::Union(
            UnionFields::new(
                vec![8, 4, 9],
                vec![
                    Field::new("strings", DataType::Utf8, false),
                    Field::new("integers", DataType::Int32, false),
                    Field::new("floats", DataType::Float64, false),
                ],
            ),
            UnionMode::Dense,
        );
        let string_array = StringArray::from(vec!["foo", "bar", "baz"]);
        let int_array = Int32Array::from(vec![5, 6, 4]);
        let float_array = Float64Array::from(vec![10.0]);
        let type_ids = Buffer::from_vec(vec![4_i8, 8, 4, 8, 9, 4, 8]);
        let value_offsets = Buffer::from_vec(vec![0_i32, 0, 1, 1, 0, 2, 2]);
        let data = ArrayData::builder(data_type)
            .len(7)
            .buffers(vec![type_ids, value_offsets])
            .child_data(vec![
                string_array.into_data(),
                int_array.into_data(),
                float_array.into_data(),
            ])
            .build()
            .unwrap();
        let array = UnionArray::from(data);
        let v = array.value(0);
        assert_eq!(v.data_type(), &DataType::Int32);
        assert_eq!(v.len(), 1);
        assert_eq!(v.as_primitive::<Int32Type>().value(0), 5);
        let v = array.value(1);
        assert_eq!(v.data_type(), &DataType::Utf8);
        assert_eq!(v.len(), 1);
        assert_eq!(v.as_string::<i32>().value(0), "foo");
        let v = array.value(2);
        assert_eq!(v.data_type(), &DataType::Int32);
        assert_eq!(v.len(), 1);
        assert_eq!(v.as_primitive::<Int32Type>().value(0), 6);
        let v = array.value(3);
        assert_eq!(v.data_type(), &DataType::Utf8);
        assert_eq!(v.len(), 1);
        assert_eq!(v.as_string::<i32>().value(0), "bar");
        let v = array.value(4);
        assert_eq!(v.data_type(), &DataType::Float64);
        assert_eq!(v.len(), 1);
        assert_eq!(v.as_primitive::<Float64Type>().value(0), 10.0);
        let v = array.value(5);
        assert_eq!(v.data_type(), &DataType::Int32);
        assert_eq!(v.len(), 1);
        assert_eq!(v.as_primitive::<Int32Type>().value(0), 4);
        let v = array.value(6);
        assert_eq!(v.data_type(), &DataType::Utf8);
        assert_eq!(v.len(), 1);
        assert_eq!(v.as_string::<i32>().value(0), "baz");
    }
}