use arrow::array::{Array, PrimitiveArray};
use arrow::bitmap::Bitmap;
use arrow::legacy::array::PolarsArray;
use arrow::legacy::data_types::{ArrayRef, IsFloat};
use arrow::legacy::slice::ExtremaNanAware;
use arrow::legacy::utils::CustomIterTools;
use arrow::types::NativeType;
use polars_core::prelude::*;
use polars_core::with_match_physical_numeric_polars_type;
use crate::chunked_array::list::namespace::has_inner_nulls;
fn min_between_offsets<T>(values: &[T], offset: &[i64]) -> PrimitiveArray<T>
where
    T: NativeType + PartialOrd + IsFloat,
{
    let mut running_offset = offset[0];
    (offset[1..])
        .iter()
        .map(|end| {
            let current_offset = running_offset;
            running_offset = *end;
            let slice = unsafe { values.get_unchecked(current_offset as usize..*end as usize) };
            slice.min_value_nan_aware().copied()
        })
        .collect_trusted()
}
fn dispatch_min<T>(arr: &dyn Array, offsets: &[i64], validity: Option<&Bitmap>) -> ArrayRef
where
    T: NativeType + PartialOrd + IsFloat,
{
    let values = arr.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
    let values = values.values().as_slice();
    let mut out = min_between_offsets(values, offsets);
    if let Some(validity) = validity {
        if out.has_validity() {
            out.apply_validity(|other_validity| validity & &other_validity)
        } else {
            out = out.with_validity(Some(validity.clone()));
        }
    }
    Box::new(out)
}
fn min_list_numerical(ca: &ListChunked, inner_type: &DataType) -> Series {
    use DataType::*;
    let chunks = ca
        .downcast_iter()
        .map(|arr| {
            let offsets = arr.offsets().as_slice();
            let values = arr.values().as_ref();
            match inner_type {
                Int8 => dispatch_min::<i8>(values, offsets, arr.validity()),
                Int16 => dispatch_min::<i16>(values, offsets, arr.validity()),
                Int32 => dispatch_min::<i32>(values, offsets, arr.validity()),
                Int64 => dispatch_min::<i64>(values, offsets, arr.validity()),
                UInt8 => dispatch_min::<u8>(values, offsets, arr.validity()),
                UInt16 => dispatch_min::<u16>(values, offsets, arr.validity()),
                UInt32 => dispatch_min::<u32>(values, offsets, arr.validity()),
                UInt64 => dispatch_min::<u64>(values, offsets, arr.validity()),
                Float32 => dispatch_min::<f32>(values, offsets, arr.validity()),
                Float64 => dispatch_min::<f64>(values, offsets, arr.validity()),
                _ => unimplemented!(),
            }
        })
        .collect::<Vec<_>>();
    Series::try_from((ca.name(), chunks)).unwrap()
}
pub(super) fn list_min_function(ca: &ListChunked) -> Series {
    fn inner(ca: &ListChunked) -> Series {
        match ca.inner_dtype() {
            DataType::Boolean => {
                let out: BooleanChunked = ca
                    .apply_amortized_generic(|s| s.and_then(|s| s.as_ref().bool().unwrap().min()));
                out.into_series()
            },
            dt if dt.is_numeric() => {
                with_match_physical_numeric_polars_type!(dt, |$T| {
                    let out: ChunkedArray<$T> = ca.apply_amortized_generic(|opt_s| {
                            let s = opt_s?;
                            let ca: &ChunkedArray<$T> = s.as_ref().as_ref().as_ref();
                            ca.min()
                    });
                    out.into_series()
                })
            },
            _ => ca
                .apply_amortized(|s| s.as_ref().min_as_series())
                .explode()
                .unwrap()
                .into_series(),
        }
    }
    if has_inner_nulls(ca) {
        return inner(ca);
    };
    match ca.inner_dtype() {
        dt if dt.is_numeric() => min_list_numerical(ca, &dt),
        _ => inner(ca),
    }
}
fn max_between_offsets<T>(values: &[T], offset: &[i64]) -> PrimitiveArray<T>
where
    T: NativeType + PartialOrd + IsFloat,
{
    let mut running_offset = offset[0];
    (offset[1..])
        .iter()
        .map(|end| {
            let current_offset = running_offset;
            running_offset = *end;
            let slice = unsafe { values.get_unchecked(current_offset as usize..*end as usize) };
            slice.max_value_nan_aware().copied()
        })
        .collect_trusted()
}
fn dispatch_max<T>(arr: &dyn Array, offsets: &[i64], validity: Option<&Bitmap>) -> ArrayRef
where
    T: NativeType + PartialOrd + IsFloat,
{
    let values = arr.as_any().downcast_ref::<PrimitiveArray<T>>().unwrap();
    let values = values.values().as_slice();
    let mut out = max_between_offsets(values, offsets);
    if let Some(validity) = validity {
        if out.has_validity() {
            out.apply_validity(|other_validity| validity & &other_validity)
        } else {
            out = out.with_validity(Some(validity.clone()));
        }
    }
    Box::new(out)
}
fn max_list_numerical(ca: &ListChunked, inner_type: &DataType) -> Series {
    use DataType::*;
    let chunks = ca
        .downcast_iter()
        .map(|arr| {
            let offsets = arr.offsets().as_slice();
            let values = arr.values().as_ref();
            match inner_type {
                Int8 => dispatch_max::<i8>(values, offsets, arr.validity()),
                Int16 => dispatch_max::<i16>(values, offsets, arr.validity()),
                Int32 => dispatch_max::<i32>(values, offsets, arr.validity()),
                Int64 => dispatch_max::<i64>(values, offsets, arr.validity()),
                UInt8 => dispatch_max::<u8>(values, offsets, arr.validity()),
                UInt16 => dispatch_max::<u16>(values, offsets, arr.validity()),
                UInt32 => dispatch_max::<u32>(values, offsets, arr.validity()),
                UInt64 => dispatch_max::<u64>(values, offsets, arr.validity()),
                Float32 => dispatch_max::<f32>(values, offsets, arr.validity()),
                Float64 => dispatch_max::<f64>(values, offsets, arr.validity()),
                _ => unimplemented!(),
            }
        })
        .collect::<Vec<_>>();
    Series::try_from((ca.name(), chunks)).unwrap()
}
pub(super) fn list_max_function(ca: &ListChunked) -> Series {
    fn inner(ca: &ListChunked) -> Series {
        match ca.inner_dtype() {
            DataType::Boolean => {
                let out: BooleanChunked = ca
                    .apply_amortized_generic(|s| s.and_then(|s| s.as_ref().bool().unwrap().max()));
                out.into_series()
            },
            dt if dt.is_numeric() => {
                with_match_physical_numeric_polars_type!(dt, |$T| {
                    let out: ChunkedArray<$T> = ca.apply_amortized_generic(|opt_s| {
                            let s = opt_s?;
                            let ca: &ChunkedArray<$T> = s.as_ref().as_ref().as_ref();
                            ca.max()
                    });
                    out.into_series()
                })
            },
            _ => ca
                .apply_amortized(|s| s.as_ref().max_as_series())
                .explode()
                .unwrap()
                .into_series(),
        }
    }
    if has_inner_nulls(ca) {
        return inner(ca);
    };
    match ca.inner_dtype() {
        dt if dt.is_numeric() => max_list_numerical(ca, &dt),
        _ => inner(ca),
    }
}