use crate::builder::null_buffer_builder::NullBufferBuilder;
use crate::builder::{ArrayBuilder, BufferBuilder};
use crate::types::*;
use crate::{ArrayRef, ArrowPrimitiveType, PrimitiveArray};
use arrow_buffer::{Buffer, MutableBuffer};
use arrow_data::ArrayData;
use arrow_schema::DataType;
use std::any::Any;
use std::sync::Arc;
pub type Int8Builder = PrimitiveBuilder<Int8Type>;
pub type Int16Builder = PrimitiveBuilder<Int16Type>;
pub type Int32Builder = PrimitiveBuilder<Int32Type>;
pub type Int64Builder = PrimitiveBuilder<Int64Type>;
pub type UInt8Builder = PrimitiveBuilder<UInt8Type>;
pub type UInt16Builder = PrimitiveBuilder<UInt16Type>;
pub type UInt32Builder = PrimitiveBuilder<UInt32Type>;
pub type UInt64Builder = PrimitiveBuilder<UInt64Type>;
pub type Float32Builder = PrimitiveBuilder<Float32Type>;
pub type Float64Builder = PrimitiveBuilder<Float64Type>;
pub type TimestampSecondBuilder = PrimitiveBuilder<TimestampSecondType>;
pub type TimestampMillisecondBuilder = PrimitiveBuilder<TimestampMillisecondType>;
pub type TimestampMicrosecondBuilder = PrimitiveBuilder<TimestampMicrosecondType>;
pub type TimestampNanosecondBuilder = PrimitiveBuilder<TimestampNanosecondType>;
pub type Date32Builder = PrimitiveBuilder<Date32Type>;
pub type Date64Builder = PrimitiveBuilder<Date64Type>;
pub type Time32SecondBuilder = PrimitiveBuilder<Time32SecondType>;
pub type Time32MillisecondBuilder = PrimitiveBuilder<Time32MillisecondType>;
pub type Time64MicrosecondBuilder = PrimitiveBuilder<Time64MicrosecondType>;
pub type Time64NanosecondBuilder = PrimitiveBuilder<Time64NanosecondType>;
pub type IntervalYearMonthBuilder = PrimitiveBuilder<IntervalYearMonthType>;
pub type IntervalDayTimeBuilder = PrimitiveBuilder<IntervalDayTimeType>;
pub type IntervalMonthDayNanoBuilder = PrimitiveBuilder<IntervalMonthDayNanoType>;
pub type DurationSecondBuilder = PrimitiveBuilder<DurationSecondType>;
pub type DurationMillisecondBuilder = PrimitiveBuilder<DurationMillisecondType>;
pub type DurationMicrosecondBuilder = PrimitiveBuilder<DurationMicrosecondType>;
pub type DurationNanosecondBuilder = PrimitiveBuilder<DurationNanosecondType>;
pub type Decimal128Builder = PrimitiveBuilder<Decimal128Type>;
pub type Decimal256Builder = PrimitiveBuilder<Decimal256Type>;
#[derive(Debug)]
pub struct PrimitiveBuilder<T: ArrowPrimitiveType> {
    values_builder: BufferBuilder<T::Native>,
    null_buffer_builder: NullBufferBuilder,
    data_type: DataType,
}
impl<T: ArrowPrimitiveType> ArrayBuilder for PrimitiveBuilder<T> {
    fn as_any(&self) -> &dyn Any {
        self
    }
    fn as_any_mut(&mut self) -> &mut dyn Any {
        self
    }
    fn into_box_any(self: Box<Self>) -> Box<dyn Any> {
        self
    }
    fn len(&self) -> usize {
        self.values_builder.len()
    }
    fn is_empty(&self) -> bool {
        self.values_builder.is_empty()
    }
    fn finish(&mut self) -> ArrayRef {
        Arc::new(self.finish())
    }
    fn finish_cloned(&self) -> ArrayRef {
        Arc::new(self.finish_cloned())
    }
}
impl<T: ArrowPrimitiveType> Default for PrimitiveBuilder<T> {
    fn default() -> Self {
        Self::new()
    }
}
impl<T: ArrowPrimitiveType> PrimitiveBuilder<T> {
    pub fn new() -> Self {
        Self::with_capacity(1024)
    }
    pub fn with_capacity(capacity: usize) -> Self {
        Self {
            values_builder: BufferBuilder::<T::Native>::new(capacity),
            null_buffer_builder: NullBufferBuilder::new(capacity),
            data_type: T::DATA_TYPE,
        }
    }
    pub fn new_from_buffer(
        values_buffer: MutableBuffer,
        null_buffer: Option<MutableBuffer>,
    ) -> Self {
        let values_builder = BufferBuilder::<T::Native>::new_from_buffer(values_buffer);
        let null_buffer_builder = null_buffer
            .map(|buffer| {
                NullBufferBuilder::new_from_buffer(buffer, values_builder.len())
            })
            .unwrap_or_else(|| NullBufferBuilder::new_with_len(values_builder.len()));
        Self {
            values_builder,
            null_buffer_builder,
            data_type: T::DATA_TYPE,
        }
    }
    pub fn with_data_type(self, data_type: DataType) -> Self {
        assert!(
            PrimitiveArray::<T>::is_compatible(&data_type),
            "incompatible data type for builder, expected {} got {}",
            T::DATA_TYPE,
            data_type
        );
        Self { data_type, ..self }
    }
    pub fn capacity(&self) -> usize {
        self.values_builder.capacity()
    }
    #[inline]
    pub fn append_value(&mut self, v: T::Native) {
        self.null_buffer_builder.append_non_null();
        self.values_builder.append(v);
    }
    #[inline]
    pub fn append_null(&mut self) {
        self.null_buffer_builder.append_null();
        self.values_builder.advance(1);
    }
    #[inline]
    pub fn append_nulls(&mut self, n: usize) {
        self.null_buffer_builder.append_n_nulls(n);
        self.values_builder.advance(n);
    }
    #[inline]
    pub fn append_option(&mut self, v: Option<T::Native>) {
        match v {
            None => self.append_null(),
            Some(v) => self.append_value(v),
        };
    }
    #[inline]
    pub fn append_slice(&mut self, v: &[T::Native]) {
        self.null_buffer_builder.append_n_non_nulls(v.len());
        self.values_builder.append_slice(v);
    }
    #[inline]
    pub fn append_values(&mut self, values: &[T::Native], is_valid: &[bool]) {
        assert_eq!(
            values.len(),
            is_valid.len(),
            "Value and validity lengths must be equal"
        );
        self.null_buffer_builder.append_slice(is_valid);
        self.values_builder.append_slice(values);
    }
    #[inline]
    pub unsafe fn append_trusted_len_iter(
        &mut self,
        iter: impl IntoIterator<Item = T::Native>,
    ) {
        let iter = iter.into_iter();
        let len = iter
            .size_hint()
            .1
            .expect("append_trusted_len_iter requires an upper bound");
        self.null_buffer_builder.append_n_non_nulls(len);
        self.values_builder.append_trusted_len_iter(iter);
    }
    pub fn finish(&mut self) -> PrimitiveArray<T> {
        let len = self.len();
        let null_bit_buffer = self.null_buffer_builder.finish();
        let builder = ArrayData::builder(self.data_type.clone())
            .len(len)
            .add_buffer(self.values_builder.finish())
            .null_bit_buffer(null_bit_buffer);
        let array_data = unsafe { builder.build_unchecked() };
        PrimitiveArray::<T>::from(array_data)
    }
    pub fn finish_cloned(&self) -> PrimitiveArray<T> {
        let len = self.len();
        let null_bit_buffer = self
            .null_buffer_builder
            .as_slice()
            .map(Buffer::from_slice_ref);
        let values_buffer = Buffer::from_slice_ref(self.values_builder.as_slice());
        let builder = ArrayData::builder(self.data_type.clone())
            .len(len)
            .add_buffer(values_buffer)
            .null_bit_buffer(null_bit_buffer);
        let array_data = unsafe { builder.build_unchecked() };
        PrimitiveArray::<T>::from(array_data)
    }
    pub fn values_slice(&self) -> &[T::Native] {
        self.values_builder.as_slice()
    }
    pub fn values_slice_mut(&mut self) -> &mut [T::Native] {
        self.values_builder.as_slice_mut()
    }
    pub fn validity_slice(&self) -> Option<&[u8]> {
        self.null_buffer_builder.as_slice()
    }
    pub fn validity_slice_mut(&mut self) -> Option<&mut [u8]> {
        self.null_buffer_builder.as_slice_mut()
    }
    pub fn slices_mut(&mut self) -> (&mut [T::Native], Option<&mut [u8]>) {
        (
            self.values_builder.as_slice_mut(),
            self.null_buffer_builder.as_slice_mut(),
        )
    }
}
impl<P: ArrowPrimitiveType> Extend<Option<P::Native>> for PrimitiveBuilder<P> {
    #[inline]
    fn extend<T: IntoIterator<Item = Option<P::Native>>>(&mut self, iter: T) {
        for v in iter {
            self.append_option(v)
        }
    }
}
#[cfg(test)]
mod tests {
    use super::*;
    use arrow_buffer::Buffer;
    use arrow_schema::TimeUnit;
    use crate::array::Array;
    use crate::array::BooleanArray;
    use crate::array::Date32Array;
    use crate::array::Int32Array;
    use crate::array::TimestampSecondArray;
    use crate::builder::Int32Builder;
    #[test]
    fn test_primitive_array_builder_i32() {
        let mut builder = Int32Array::builder(5);
        for i in 0..5 {
            builder.append_value(i);
        }
        let arr = builder.finish();
        assert_eq!(5, arr.len());
        assert_eq!(0, arr.offset());
        assert_eq!(0, arr.null_count());
        for i in 0..5 {
            assert!(!arr.is_null(i));
            assert!(arr.is_valid(i));
            assert_eq!(i as i32, arr.value(i));
        }
    }
    #[test]
    fn test_primitive_array_builder_i32_append_iter() {
        let mut builder = Int32Array::builder(5);
        unsafe { builder.append_trusted_len_iter(0..5) };
        let arr = builder.finish();
        assert_eq!(5, arr.len());
        assert_eq!(0, arr.offset());
        assert_eq!(0, arr.null_count());
        for i in 0..5 {
            assert!(!arr.is_null(i));
            assert!(arr.is_valid(i));
            assert_eq!(i as i32, arr.value(i));
        }
    }
    #[test]
    fn test_primitive_array_builder_i32_append_nulls() {
        let mut builder = Int32Array::builder(5);
        builder.append_nulls(5);
        let arr = builder.finish();
        assert_eq!(5, arr.len());
        assert_eq!(0, arr.offset());
        assert_eq!(5, arr.null_count());
        for i in 0..5 {
            assert!(arr.is_null(i));
            assert!(!arr.is_valid(i));
        }
    }
    #[test]
    fn test_primitive_array_builder_date32() {
        let mut builder = Date32Array::builder(5);
        for i in 0..5 {
            builder.append_value(i);
        }
        let arr = builder.finish();
        assert_eq!(5, arr.len());
        assert_eq!(0, arr.offset());
        assert_eq!(0, arr.null_count());
        for i in 0..5 {
            assert!(!arr.is_null(i));
            assert!(arr.is_valid(i));
            assert_eq!(i as i32, arr.value(i));
        }
    }
    #[test]
    fn test_primitive_array_builder_timestamp_second() {
        let mut builder = TimestampSecondArray::builder(5);
        for i in 0..5 {
            builder.append_value(i);
        }
        let arr = builder.finish();
        assert_eq!(5, arr.len());
        assert_eq!(0, arr.offset());
        assert_eq!(0, arr.null_count());
        for i in 0..5 {
            assert!(!arr.is_null(i));
            assert!(arr.is_valid(i));
            assert_eq!(i as i64, arr.value(i));
        }
    }
    #[test]
    fn test_primitive_array_builder_bool() {
        let buf = Buffer::from([72_u8, 2_u8]);
        let mut builder = BooleanArray::builder(10);
        for i in 0..10 {
            if i == 3 || i == 6 || i == 9 {
                builder.append_value(true);
            } else {
                builder.append_value(false);
            }
        }
        let arr = builder.finish();
        assert_eq!(&buf, arr.values().inner());
        assert_eq!(10, arr.len());
        assert_eq!(0, arr.offset());
        assert_eq!(0, arr.null_count());
        for i in 0..10 {
            assert!(!arr.is_null(i));
            assert!(arr.is_valid(i));
            assert_eq!(i == 3 || i == 6 || i == 9, arr.value(i), "failed at {i}")
        }
    }
    #[test]
    fn test_primitive_array_builder_append_option() {
        let arr1 = Int32Array::from(vec![Some(0), None, Some(2), None, Some(4)]);
        let mut builder = Int32Array::builder(5);
        builder.append_option(Some(0));
        builder.append_option(None);
        builder.append_option(Some(2));
        builder.append_option(None);
        builder.append_option(Some(4));
        let arr2 = builder.finish();
        assert_eq!(arr1.len(), arr2.len());
        assert_eq!(arr1.offset(), arr2.offset());
        assert_eq!(arr1.null_count(), arr2.null_count());
        for i in 0..5 {
            assert_eq!(arr1.is_null(i), arr2.is_null(i));
            assert_eq!(arr1.is_valid(i), arr2.is_valid(i));
            if arr1.is_valid(i) {
                assert_eq!(arr1.value(i), arr2.value(i));
            }
        }
    }
    #[test]
    fn test_primitive_array_builder_append_null() {
        let arr1 = Int32Array::from(vec![Some(0), Some(2), None, None, Some(4)]);
        let mut builder = Int32Array::builder(5);
        builder.append_value(0);
        builder.append_value(2);
        builder.append_null();
        builder.append_null();
        builder.append_value(4);
        let arr2 = builder.finish();
        assert_eq!(arr1.len(), arr2.len());
        assert_eq!(arr1.offset(), arr2.offset());
        assert_eq!(arr1.null_count(), arr2.null_count());
        for i in 0..5 {
            assert_eq!(arr1.is_null(i), arr2.is_null(i));
            assert_eq!(arr1.is_valid(i), arr2.is_valid(i));
            if arr1.is_valid(i) {
                assert_eq!(arr1.value(i), arr2.value(i));
            }
        }
    }
    #[test]
    fn test_primitive_array_builder_append_slice() {
        let arr1 = Int32Array::from(vec![Some(0), Some(2), None, None, Some(4)]);
        let mut builder = Int32Array::builder(5);
        builder.append_slice(&[0, 2]);
        builder.append_null();
        builder.append_null();
        builder.append_value(4);
        let arr2 = builder.finish();
        assert_eq!(arr1.len(), arr2.len());
        assert_eq!(arr1.offset(), arr2.offset());
        assert_eq!(arr1.null_count(), arr2.null_count());
        for i in 0..5 {
            assert_eq!(arr1.is_null(i), arr2.is_null(i));
            assert_eq!(arr1.is_valid(i), arr2.is_valid(i));
            if arr1.is_valid(i) {
                assert_eq!(arr1.value(i), arr2.value(i));
            }
        }
    }
    #[test]
    fn test_primitive_array_builder_finish() {
        let mut builder = Int32Builder::new();
        builder.append_slice(&[2, 4, 6, 8]);
        let mut arr = builder.finish();
        assert_eq!(4, arr.len());
        assert_eq!(0, builder.len());
        builder.append_slice(&[1, 3, 5, 7, 9]);
        arr = builder.finish();
        assert_eq!(5, arr.len());
        assert_eq!(0, builder.len());
    }
    #[test]
    fn test_primitive_array_builder_finish_cloned() {
        let mut builder = Int32Builder::new();
        builder.append_value(23);
        builder.append_value(45);
        let result = builder.finish_cloned();
        assert_eq!(result, Int32Array::from(vec![23, 45]));
        builder.append_value(56);
        assert_eq!(builder.finish_cloned(), Int32Array::from(vec![23, 45, 56]));
        builder.append_slice(&[2, 4, 6, 8]);
        let mut arr = builder.finish();
        assert_eq!(7, arr.len());
        assert_eq!(arr, Int32Array::from(vec![23, 45, 56, 2, 4, 6, 8]));
        assert_eq!(0, builder.len());
        builder.append_slice(&[1, 3, 5, 7, 9]);
        arr = builder.finish();
        assert_eq!(5, arr.len());
        assert_eq!(0, builder.len());
    }
    #[test]
    fn test_primitive_array_builder_with_data_type() {
        let mut builder =
            Decimal128Builder::new().with_data_type(DataType::Decimal128(1, 2));
        builder.append_value(1);
        let array = builder.finish();
        assert_eq!(array.precision(), 1);
        assert_eq!(array.scale(), 2);
        let data_type = DataType::Timestamp(TimeUnit::Nanosecond, Some("+00:00".into()));
        let mut builder =
            TimestampNanosecondBuilder::new().with_data_type(data_type.clone());
        builder.append_value(1);
        let array = builder.finish();
        assert_eq!(array.data_type(), &data_type);
    }
    #[test]
    #[should_panic(
        expected = "incompatible data type for builder, expected Int32 got Int64"
    )]
    fn test_invalid_with_data_type() {
        Int32Builder::new().with_data_type(DataType::Int64);
    }
    #[test]
    fn test_extend() {
        let mut builder = PrimitiveBuilder::<Int16Type>::new();
        builder.extend([1, 2, 3, 5, 2, 4, 4].into_iter().map(Some));
        builder.extend([2, 4, 6, 2].into_iter().map(Some));
        let array = builder.finish();
        assert_eq!(array.values(), &[1, 2, 3, 5, 2, 4, 4, 2, 4, 6, 2]);
    }
}