use crate::array::{get_offsets, print_long_array};
use crate::{make_array, Array, ArrayRef, ListArray, StringArray, StructArray};
use arrow_buffer::{ArrowNativeType, Buffer, NullBuffer, OffsetBuffer, ToByteSlice};
use arrow_data::ArrayData;
use arrow_schema::{ArrowError, DataType, Field};
use std::any::Any;
use std::sync::Arc;
#[derive(Clone)]
pub struct MapArray {
    data: ArrayData,
    entries: ArrayRef,
    keys: ArrayRef,
    values: ArrayRef,
    value_offsets: OffsetBuffer<i32>,
}
impl MapArray {
    pub fn keys(&self) -> &ArrayRef {
        &self.keys
    }
    pub fn values(&self) -> &ArrayRef {
        &self.values
    }
    pub fn entries(&self) -> &ArrayRef {
        &self.entries
    }
    pub fn key_type(&self) -> &DataType {
        self.keys.data_type()
    }
    pub fn value_type(&self) -> &DataType {
        self.values.data_type()
    }
    pub unsafe fn value_unchecked(&self, i: usize) -> ArrayRef {
        let end = *self.value_offsets().get_unchecked(i + 1);
        let start = *self.value_offsets().get_unchecked(i);
        self.entries
            .slice(start.to_usize().unwrap(), (end - start).to_usize().unwrap())
    }
    pub fn value(&self, i: usize) -> ArrayRef {
        let end = self.value_offsets()[i + 1] as usize;
        let start = self.value_offsets()[i] as usize;
        self.entries.slice(start, end - start)
    }
    #[inline]
    pub fn value_offsets(&self) -> &[i32] {
        &self.value_offsets
    }
    #[inline]
    pub fn value_length(&self, i: usize) -> i32 {
        let offsets = self.value_offsets();
        offsets[i + 1] - offsets[i]
    }
    pub fn slice(&self, offset: usize, length: usize) -> Self {
        self.data.slice(offset, length).into()
    }
}
impl From<ArrayData> for MapArray {
    fn from(data: ArrayData) -> Self {
        Self::try_new_from_array_data(data)
            .expect("Expected infallible creation of MapArray from ArrayData failed")
    }
}
impl From<MapArray> for ArrayData {
    fn from(array: MapArray) -> Self {
        array.data
    }
}
impl MapArray {
    fn try_new_from_array_data(data: ArrayData) -> Result<Self, ArrowError> {
        if !matches!(data.data_type(), DataType::Map(_, _)) {
            return Err(ArrowError::InvalidArgumentError(format!(
                "MapArray expected ArrayData with DataType::Map got {}",
                data.data_type()
            )));
        }
        if data.buffers().len() != 1 {
            return Err(ArrowError::InvalidArgumentError(
                format!("MapArray data should contain a single buffer only (value offsets), had {}",
                        data.len())));
        }
        if data.child_data().len() != 1 {
            return Err(ArrowError::InvalidArgumentError(format!(
                "MapArray should contain a single child array (values array), had {}",
                data.child_data().len()
            )));
        }
        let entries = data.child_data()[0].clone();
        if let DataType::Struct(fields) = entries.data_type() {
            if fields.len() != 2 {
                return Err(ArrowError::InvalidArgumentError(format!(
                "MapArray should contain a struct array with 2 fields, have {} fields",
                fields.len()
            )));
            }
        } else {
            return Err(ArrowError::InvalidArgumentError(format!(
                "MapArray should contain a struct array child, found {:?}",
                entries.data_type()
            )));
        }
        let keys = make_array(entries.child_data()[0].clone());
        let values = make_array(entries.child_data()[1].clone());
        let entries = make_array(entries);
        let value_offsets = unsafe { get_offsets(&data) };
        Ok(Self {
            data,
            entries,
            keys,
            values,
            value_offsets,
        })
    }
    pub fn new_from_strings<'a>(
        keys: impl Iterator<Item = &'a str>,
        values: &dyn Array,
        entry_offsets: &[u32],
    ) -> Result<Self, ArrowError> {
        let entry_offsets_buffer = Buffer::from(entry_offsets.to_byte_slice());
        let keys_data = StringArray::from_iter_values(keys);
        let keys_field = Field::new("keys", DataType::Utf8, false);
        let values_field = Field::new(
            "values",
            values.data_type().clone(),
            values.null_count() > 0,
        );
        let entry_struct = StructArray::from(vec![
            (keys_field, Arc::new(keys_data) as ArrayRef),
            (values_field, make_array(values.to_data())),
        ]);
        let map_data_type = DataType::Map(
            Arc::new(Field::new(
                "entries",
                entry_struct.data_type().clone(),
                true,
            )),
            false,
        );
        let map_data = ArrayData::builder(map_data_type)
            .len(entry_offsets.len() - 1)
            .add_buffer(entry_offsets_buffer)
            .add_child_data(entry_struct.into_data())
            .build()?;
        Ok(MapArray::from(map_data))
    }
}
impl Array for MapArray {
    fn as_any(&self) -> &dyn Any {
        self
    }
    fn data(&self) -> &ArrayData {
        &self.data
    }
    fn to_data(&self) -> ArrayData {
        self.data.clone()
    }
    fn into_data(self) -> ArrayData {
        self.into()
    }
    fn slice(&self, offset: usize, length: usize) -> ArrayRef {
        Arc::new(self.slice(offset, length))
    }
    fn nulls(&self) -> Option<&NullBuffer> {
        self.data.nulls()
    }
    fn get_buffer_memory_size(&self) -> usize {
        self.data.get_buffer_memory_size()
    }
    fn get_array_memory_size(&self) -> usize {
        self.data.get_array_memory_size() + std::mem::size_of_val(self)
    }
}
impl std::fmt::Debug for MapArray {
    fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
        write!(f, "MapArray\n[\n")?;
        print_long_array(self, f, |array, index, f| {
            std::fmt::Debug::fmt(&array.value(index), f)
        })?;
        write!(f, "]")
    }
}
impl From<MapArray> for ListArray {
    fn from(value: MapArray) -> Self {
        let field = match value.data_type() {
            DataType::Map(field, _) => field,
            _ => unreachable!("This should be a map type."),
        };
        let data_type = DataType::List(field.clone());
        let builder = value.into_data().into_builder().data_type(data_type);
        let array_data = unsafe { builder.build_unchecked() };
        ListArray::from(array_data)
    }
}
#[cfg(test)]
mod tests {
    use crate::cast::AsArray;
    use crate::types::UInt32Type;
    use crate::{Int32Array, UInt32Array};
    use arrow_schema::Fields;
    use std::sync::Arc;
    use super::*;
    fn create_from_buffers() -> MapArray {
        let keys_data = ArrayData::builder(DataType::Int32)
            .len(8)
            .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
            .build()
            .unwrap();
        let values_data = ArrayData::builder(DataType::UInt32)
            .len(8)
            .add_buffer(Buffer::from(
                &[0u32, 10, 20, 30, 40, 50, 60, 70].to_byte_slice(),
            ))
            .build()
            .unwrap();
        let entry_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice());
        let keys = Field::new("keys", DataType::Int32, false);
        let values = Field::new("values", DataType::UInt32, false);
        let entry_struct = StructArray::from(vec![
            (keys, make_array(keys_data)),
            (values, make_array(values_data)),
        ]);
        let map_data_type = DataType::Map(
            Arc::new(Field::new(
                "entries",
                entry_struct.data_type().clone(),
                true,
            )),
            false,
        );
        let map_data = ArrayData::builder(map_data_type)
            .len(3)
            .add_buffer(entry_offsets)
            .add_child_data(entry_struct.into_data())
            .build()
            .unwrap();
        MapArray::from(map_data)
    }
    #[test]
    fn test_map_array() {
        let key_data = ArrayData::builder(DataType::Int32)
            .len(8)
            .add_buffer(Buffer::from(&[0, 1, 2, 3, 4, 5, 6, 7].to_byte_slice()))
            .build()
            .unwrap();
        let value_data = ArrayData::builder(DataType::UInt32)
            .len(8)
            .add_buffer(Buffer::from(
                &[0u32, 10, 20, 0, 40, 0, 60, 70].to_byte_slice(),
            ))
            .null_bit_buffer(Some(Buffer::from(&[0b11010110])))
            .build()
            .unwrap();
        let entry_offsets = Buffer::from(&[0, 3, 6, 8].to_byte_slice());
        let keys_field = Field::new("keys", DataType::Int32, false);
        let values_field = Field::new("values", DataType::UInt32, true);
        let entry_struct = StructArray::from(vec![
            (keys_field.clone(), make_array(key_data)),
            (values_field.clone(), make_array(value_data.clone())),
        ]);
        let map_data_type = DataType::Map(
            Arc::new(Field::new(
                "entries",
                entry_struct.data_type().clone(),
                true,
            )),
            false,
        );
        let map_data = ArrayData::builder(map_data_type)
            .len(3)
            .add_buffer(entry_offsets)
            .add_child_data(entry_struct.into_data())
            .build()
            .unwrap();
        let map_array = MapArray::from(map_data);
        assert_eq!(value_data, map_array.values().to_data());
        assert_eq!(&DataType::UInt32, map_array.value_type());
        assert_eq!(3, map_array.len());
        assert_eq!(0, map_array.null_count());
        assert_eq!(6, map_array.value_offsets()[2]);
        assert_eq!(2, map_array.value_length(2));
        let key_array = Arc::new(Int32Array::from(vec![0, 1, 2])) as ArrayRef;
        let value_array =
            Arc::new(UInt32Array::from(vec![None, Some(10u32), Some(20)])) as ArrayRef;
        let struct_array = StructArray::from(vec![
            (keys_field.clone(), key_array),
            (values_field.clone(), value_array),
        ]);
        assert_eq!(
            struct_array,
            StructArray::from(map_array.value(0).into_data())
        );
        assert_eq!(
            &struct_array,
            unsafe { map_array.value_unchecked(0) }
                .as_any()
                .downcast_ref::<StructArray>()
                .unwrap()
        );
        for i in 0..3 {
            assert!(map_array.is_valid(i));
            assert!(!map_array.is_null(i));
        }
        let map_array = map_array.slice(1, 2);
        assert_eq!(value_data, map_array.values().to_data());
        assert_eq!(&DataType::UInt32, map_array.value_type());
        assert_eq!(2, map_array.len());
        assert_eq!(0, map_array.null_count());
        assert_eq!(6, map_array.value_offsets()[1]);
        assert_eq!(2, map_array.value_length(1));
        let key_array = Arc::new(Int32Array::from(vec![3, 4, 5])) as ArrayRef;
        let value_array =
            Arc::new(UInt32Array::from(vec![None, Some(40), None])) as ArrayRef;
        let struct_array =
            StructArray::from(vec![(keys_field, key_array), (values_field, value_array)]);
        assert_eq!(
            &struct_array,
            map_array
                .value(0)
                .as_any()
                .downcast_ref::<StructArray>()
                .unwrap()
        );
        assert_eq!(
            &struct_array,
            unsafe { map_array.value_unchecked(0) }
                .as_any()
                .downcast_ref::<StructArray>()
                .unwrap()
        );
    }
    #[test]
    #[ignore = "Test fails because slice of <list<struct>> is still buggy"]
    fn test_map_array_slice() {
        let map_array = create_from_buffers();
        let sliced_array = map_array.slice(1, 2);
        assert_eq!(2, sliced_array.len());
        assert_eq!(1, sliced_array.offset());
        let sliced_array_data = sliced_array.to_data();
        for array_data in sliced_array_data.child_data() {
            assert_eq!(array_data.offset(), 1);
        }
        let sliced_map_array = sliced_array.as_any().downcast_ref::<MapArray>().unwrap();
        assert_eq!(3, sliced_map_array.value_offsets()[0]);
        assert_eq!(3, sliced_map_array.value_length(0));
        assert_eq!(6, sliced_map_array.value_offsets()[1]);
        assert_eq!(2, sliced_map_array.value_length(1));
        let keys_data = ArrayData::builder(DataType::Int32)
            .len(5)
            .add_buffer(Buffer::from(&[3, 4, 5, 6, 7].to_byte_slice()))
            .build()
            .unwrap();
        let values_data = ArrayData::builder(DataType::UInt32)
            .len(5)
            .add_buffer(Buffer::from(&[30u32, 40, 50, 60, 70].to_byte_slice()))
            .build()
            .unwrap();
        let entry_offsets = Buffer::from(&[0, 3, 5].to_byte_slice());
        let keys = Field::new("keys", DataType::Int32, false);
        let values = Field::new("values", DataType::UInt32, false);
        let entry_struct = StructArray::from(vec![
            (keys, make_array(keys_data)),
            (values, make_array(values_data)),
        ]);
        let map_data_type = DataType::Map(
            Arc::new(Field::new(
                "entries",
                entry_struct.data_type().clone(),
                true,
            )),
            false,
        );
        let expected_map_data = ArrayData::builder(map_data_type)
            .len(2)
            .add_buffer(entry_offsets)
            .add_child_data(entry_struct.into_data())
            .build()
            .unwrap();
        let expected_map_array = MapArray::from(expected_map_data);
        assert_eq!(&expected_map_array, sliced_map_array)
    }
    #[test]
    #[should_panic(expected = "index out of bounds: the len is ")]
    fn test_map_array_index_out_of_bound() {
        let map_array = create_from_buffers();
        map_array.value(map_array.len());
    }
    #[test]
    #[should_panic(
        expected = "MapArray expected ArrayData with DataType::Map got Dictionary"
    )]
    fn test_from_array_data_validation() {
        let struct_t = DataType::Struct(Fields::from(vec![
            Field::new("keys", DataType::Int32, true),
            Field::new("values", DataType::UInt32, true),
        ]));
        let dict_t = DataType::Dictionary(Box::new(DataType::Int32), Box::new(struct_t));
        let _ = MapArray::from(ArrayData::new_empty(&dict_t));
    }
    #[test]
    fn test_new_from_strings() {
        let keys = vec!["a", "b", "c", "d", "e", "f", "g", "h"];
        let values_data = UInt32Array::from(vec![0u32, 10, 20, 30, 40, 50, 60, 70]);
        let entry_offsets = [0, 3, 6, 8];
        let map_array = MapArray::new_from_strings(
            keys.clone().into_iter(),
            &values_data,
            &entry_offsets,
        )
        .unwrap();
        assert_eq!(
            &values_data,
            map_array.values().as_primitive::<UInt32Type>()
        );
        assert_eq!(&DataType::UInt32, map_array.value_type());
        assert_eq!(3, map_array.len());
        assert_eq!(0, map_array.null_count());
        assert_eq!(6, map_array.value_offsets()[2]);
        assert_eq!(2, map_array.value_length(2));
        let key_array = Arc::new(StringArray::from(vec!["a", "b", "c"])) as ArrayRef;
        let value_array = Arc::new(UInt32Array::from(vec![0u32, 10, 20])) as ArrayRef;
        let keys_field = Field::new("keys", DataType::Utf8, false);
        let values_field = Field::new("values", DataType::UInt32, false);
        let struct_array =
            StructArray::from(vec![(keys_field, key_array), (values_field, value_array)]);
        assert_eq!(
            struct_array,
            StructArray::from(map_array.value(0).into_data())
        );
        assert_eq!(
            &struct_array,
            unsafe { map_array.value_unchecked(0) }
                .as_any()
                .downcast_ref::<StructArray>()
                .unwrap()
        );
        for i in 0..3 {
            assert!(map_array.is_valid(i));
            assert!(!map_array.is_null(i));
        }
    }
}