use std::borrow::Cow;
use std::convert::TryFrom;
use arrow::array::{BooleanArray, PrimitiveArray};
use arrow::bitmap::utils::{get_bit_unchecked, set_bit_unchecked};
use arrow::bitmap::Bitmap;
use arrow::trusted_len::TrustedLen;
use arrow::types::NativeType;
use polars_arrow::bitmap::unary_mut;
use polars_arrow::trusted_len::TrustedLenPush;
use crate::prelude::*;
use crate::series::IsSorted;
use crate::utils::{CustomIterTools, NoNull};
fn collect_array<T: NativeType, I: TrustedLen<Item = T>>(
    iter: I,
    validity: Option<Bitmap>,
) -> PrimitiveArray<T> {
    PrimitiveArray::from_trusted_len_values_iter(iter).with_validity(validity)
}
macro_rules! try_apply {
    ($self:expr, $f:expr) => {{
        if !$self.has_validity() {
            $self.into_no_null_iter().map($f).collect()
        } else {
            $self
                .into_iter()
                .map(|opt_v| opt_v.map($f).transpose())
                .collect()
        }
    }};
}
macro_rules! apply {
    ($self:expr, $f:expr) => {{
        if !$self.has_validity() {
            $self.into_no_null_iter().map($f).collect_trusted()
        } else {
            $self
                .into_iter()
                .map(|opt_v| opt_v.map($f))
                .collect_trusted()
        }
    }};
}
macro_rules! apply_enumerate {
    ($self:expr, $f:expr) => {{
        if !$self.has_validity() {
            $self
                .into_no_null_iter()
                .enumerate()
                .map($f)
                .collect_trusted()
        } else {
            $self
                .into_iter()
                .enumerate()
                .map(|(idx, opt_v)| opt_v.map(|v| $f((idx, v))))
                .collect_trusted()
        }
    }};
}
fn apply_in_place_impl<S, F>(name: &str, chunks: Vec<ArrayRef>, f: F) -> ChunkedArray<S>
where
    F: Fn(S::Native) -> S::Native + Copy,
    S: PolarsNumericType,
{
    use arrow::Either::*;
    let chunks = chunks.into_iter().map(|arr| {
        let owned_arr = arr
            .as_any()
            .downcast_ref::<PrimitiveArray<S::Native>>()
            .unwrap()
            .clone();
        drop(arr);
        let compute_immutable = |arr: &PrimitiveArray<S::Native>| {
            arrow::compute::arity::unary(arr, f, S::get_dtype().to_arrow())
        };
        if owned_arr.values().is_sliced() {
            compute_immutable(&owned_arr)
        } else {
            match owned_arr.into_mut() {
                Left(immutable) => compute_immutable(&immutable),
                Right(mut mutable) => {
                    let vals = mutable.values_mut_slice();
                    vals.iter_mut().for_each(|v| *v = f(*v));
                    mutable.into()
                },
            }
        }
    });
    ChunkedArray::from_chunk_iter(name, chunks)
}
impl<T: PolarsNumericType> ChunkedArray<T> {
    pub fn cast_and_apply_in_place<F, S>(&self, f: F) -> ChunkedArray<S>
    where
        F: Fn(S::Native) -> S::Native + Copy,
        S: PolarsNumericType,
    {
        let chunks = {
            let s = self.cast(&S::get_dtype()).unwrap();
            s.chunks().clone()
        };
        apply_in_place_impl(self.name(), chunks, f)
    }
    pub fn apply_in_place<F>(mut self, f: F) -> Self
    where
        F: Fn(T::Native) -> T::Native + Copy,
    {
        let chunks = std::mem::take(&mut self.chunks);
        apply_in_place_impl(self.name(), chunks, f)
    }
}
impl<T: PolarsNumericType> ChunkedArray<T> {
    pub fn apply_mut<F>(&mut self, f: F)
    where
        F: Fn(T::Native) -> T::Native + Copy,
    {
        unsafe {
            self.downcast_iter_mut()
                .for_each(|arr| arrow::compute::arity_assign::unary(arr, f))
        };
        self.set_sorted_flag(IsSorted::Not);
    }
}
impl<'a, T> ChunkApply<'a, T::Native, T::Native> for ChunkedArray<T>
where
    T: PolarsNumericType,
{
    fn apply_cast_numeric<F, S>(&self, f: F) -> ChunkedArray<S>
    where
        F: Fn(T::Native) -> S::Native + Copy,
        S: PolarsNumericType,
    {
        let chunks = self
            .data_views()
            .zip(self.iter_validities())
            .map(|(slice, validity)| {
                collect_array(slice.iter().copied().map(f), validity.cloned())
            });
        ChunkedArray::from_chunk_iter(self.name(), chunks)
    }
    fn branch_apply_cast_numeric_no_null<F, S>(&self, f: F) -> ChunkedArray<S>
    where
        F: Fn(Option<T::Native>) -> S::Native,
        S: PolarsNumericType,
    {
        let chunks = self.downcast_iter().map(|array| {
            if array.null_count() == 0 {
                let values = array.values().iter().map(|&v| f(Some(v)));
                collect_array(values, None)
            } else {
                let values = array.into_iter().map(|v| f(v.copied()));
                collect_array(values, None)
            }
        });
        ChunkedArray::from_chunk_iter(self.name(), chunks)
    }
    fn apply<F>(&'a self, f: F) -> Self
    where
        F: Fn(T::Native) -> T::Native + Copy,
    {
        let chunks = self
            .data_views()
            .zip(self.iter_validities())
            .map(|(slice, validity)| {
                collect_array(slice.iter().copied().map(f), validity.cloned())
            });
        ChunkedArray::from_chunk_iter(self.name(), chunks)
    }
    fn try_apply<F>(&'a self, f: F) -> PolarsResult<Self>
    where
        F: Fn(T::Native) -> PolarsResult<T::Native> + Copy,
    {
        let mut ca: ChunkedArray<T> = self
            .data_views()
            .zip(self.iter_validities())
            .map(|(slice, validity)| {
                let vec: PolarsResult<Vec<_>> = slice.iter().copied().map(f).collect();
                Ok((vec?, validity.cloned()))
            })
            .collect::<PolarsResult<_>>()?;
        ca.rename(self.name());
        Ok(ca)
    }
    fn apply_on_opt<F>(&'a self, f: F) -> Self
    where
        F: Fn(Option<T::Native>) -> Option<T::Native> + Copy,
    {
        let chunks = self.downcast_iter().map(|arr| {
            let iter = arr.into_iter().map(|opt_v| f(opt_v.copied()));
            PrimitiveArray::<T::Native>::from_trusted_len_iter(iter)
        });
        Self::from_chunk_iter(self.name(), chunks)
    }
    fn apply_with_idx<F>(&'a self, f: F) -> Self
    where
        F: Fn((usize, T::Native)) -> T::Native + Copy,
    {
        if !self.has_validity() {
            let ca: NoNull<_> = self
                .into_no_null_iter()
                .enumerate()
                .map(f)
                .collect_trusted();
            ca.into_inner()
        } else {
            unsafe {
                self.downcast_iter()
                    .flatten()
                    .trust_my_length(self.len())
                    .enumerate()
                    .map(|(idx, opt_v)| opt_v.map(|v| f((idx, *v))))
                    .collect_trusted()
            }
        }
    }
    fn apply_with_idx_on_opt<F>(&'a self, f: F) -> Self
    where
        F: Fn((usize, Option<T::Native>)) -> Option<T::Native> + Copy,
    {
        unsafe {
            self.downcast_iter()
                .flatten()
                .trust_my_length(self.len())
                .enumerate()
                .map(|(idx, v)| f((idx, v.copied())))
                .collect_trusted()
        }
    }
    fn apply_to_slice<F, V>(&'a self, f: F, slice: &mut [V])
    where
        F: Fn(Option<T::Native>, &V) -> V,
    {
        assert!(slice.len() >= self.len());
        let mut idx = 0;
        self.downcast_iter().for_each(|arr| {
            arr.into_iter().for_each(|opt_val| {
                let item = unsafe { slice.get_unchecked_mut(idx) };
                *item = f(opt_val.copied(), item);
                idx += 1;
            })
        });
    }
}
impl<'a> ChunkApply<'a, bool, bool> for BooleanChunked {
    fn apply_cast_numeric<F, S>(&self, f: F) -> ChunkedArray<S>
    where
        F: Fn(bool) -> S::Native + Copy,
        S: PolarsNumericType,
    {
        let f = |array: &BooleanArray| {
            let values = array.values().iter().map(f);
            let values = Vec::<_>::from_trusted_len_iter(values);
            let validity = array.validity().cloned();
            to_array::<S>(values, validity)
        };
        self.apply_kernel_cast(&f)
    }
    fn branch_apply_cast_numeric_no_null<F, S>(&self, f: F) -> ChunkedArray<S>
    where
        F: Fn(Option<bool>) -> S::Native + Copy,
        S: PolarsNumericType,
    {
        self.apply_kernel_cast(&|array: &BooleanArray| {
            let values = Vec::<_>::from_trusted_len_iter(array.into_iter().map(f));
            to_array::<S>(values, None)
        })
    }
    fn apply<F>(&self, f: F) -> Self
    where
        F: Fn(bool) -> bool + Copy,
    {
        self.apply_kernel(&|arr| {
            let values = arrow::bitmap::unary(arr.values(), |chunk| {
                let bytes = chunk.to_ne_bytes();
                let mut out = 0u64.to_ne_bytes();
                for i in 0..64 {
                    unsafe {
                        let val = get_bit_unchecked(&bytes, i);
                        let res = f(val);
                        set_bit_unchecked(&mut out, i, res)
                    };
                }
                u64::from_ne_bytes(out)
            });
            BooleanArray::from_data_default(values, arr.validity().cloned()).boxed()
        })
    }
    fn try_apply<F>(&self, f: F) -> PolarsResult<Self>
    where
        F: Fn(bool) -> PolarsResult<bool> + Copy,
    {
        let mut failed: Option<PolarsError> = None;
        let chunks = self.downcast_iter().map(|arr| {
            let values = unary_mut(arr.values(), |chunk| {
                let bytes = chunk.to_ne_bytes();
                if failed.is_some() {
                    0
                } else {
                    let mut out = 0u64.to_ne_bytes();
                    for i in (0..64).rev() {
                        unsafe {
                            let val = get_bit_unchecked(&bytes, i);
                            match f(val) {
                                Ok(res) => set_bit_unchecked(&mut out, i, res),
                                Err(e) => failed = Some(e),
                            }
                        };
                    }
                    u64::from_ne_bytes(out)
                }
            });
            BooleanArray::from_data_default(values, arr.validity().cloned())
        });
        let ret = BooleanChunked::from_chunk_iter(self.name(), chunks);
        if let Some(e) = failed {
            return Err(e);
        }
        Ok(ret)
    }
    fn apply_on_opt<F>(&'a self, f: F) -> Self
    where
        F: Fn(Option<bool>) -> Option<bool> + Copy,
    {
        self.into_iter().map(f).collect_trusted()
    }
    fn apply_with_idx<F>(&'a self, f: F) -> Self
    where
        F: Fn((usize, bool)) -> bool + Copy,
    {
        apply_enumerate!(self, f)
    }
    fn apply_with_idx_on_opt<F>(&'a self, f: F) -> Self
    where
        F: Fn((usize, Option<bool>)) -> Option<bool> + Copy,
    {
        self.into_iter().enumerate().map(f).collect_trusted()
    }
    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
    where
        F: Fn(Option<bool>, &T) -> T,
    {
        assert!(slice.len() >= self.len());
        let mut idx = 0;
        self.downcast_iter().for_each(|arr| {
            arr.into_iter().for_each(|opt_val| {
                let item = unsafe { slice.get_unchecked_mut(idx) };
                *item = f(opt_val, item);
                idx += 1;
            })
        });
    }
}
impl Utf8Chunked {
    pub fn apply_mut<'a, F>(&'a self, mut f: F) -> Self
    where
        F: FnMut(&'a str) -> &'a str,
    {
        use polars_arrow::array::utf8::Utf8FromIter;
        let chunks = self.downcast_iter().map(|arr| {
            let iter = arr.values_iter().map(&mut f);
            let value_size = (arr.get_values_size() as f64 * 1.3) as usize;
            let new = Utf8Array::<i64>::from_values_iter(iter, arr.len(), value_size);
            new.with_validity(arr.validity().cloned())
        });
        Utf8Chunked::from_chunk_iter(self.name(), chunks)
    }
}
impl BinaryChunked {
    pub fn apply_mut<'a, F>(&'a self, mut f: F) -> Self
    where
        F: FnMut(&'a [u8]) -> &'a [u8],
    {
        use polars_arrow::array::utf8::BinaryFromIter;
        let chunks = self.downcast_iter().map(|arr| {
            let iter = arr.values_iter().map(&mut f);
            let value_size = (arr.get_values_size() as f64 * 1.3) as usize;
            let new = BinaryArray::<i64>::from_values_iter(iter, arr.len(), value_size);
            new.with_validity(arr.validity().cloned())
        });
        BinaryChunked::from_chunk_iter(self.name(), chunks)
    }
}
impl<'a> ChunkApply<'a, &'a str, Cow<'a, str>> for Utf8Chunked {
    fn apply_cast_numeric<F, S>(&'a self, f: F) -> ChunkedArray<S>
    where
        F: Fn(&'a str) -> S::Native + Copy,
        S: PolarsNumericType,
    {
        let chunks = self.downcast_iter().map(|array| {
            let values = array.values_iter().map(f);
            collect_array(values, array.validity().cloned())
        });
        ChunkedArray::from_chunk_iter(self.name(), chunks)
    }
    fn branch_apply_cast_numeric_no_null<F, S>(&'a self, f: F) -> ChunkedArray<S>
    where
        F: Fn(Option<&'a str>) -> S::Native + Copy,
        S: PolarsNumericType,
    {
        let chunks = self.downcast_iter().map(|array| {
            let values = array.into_iter().map(f);
            collect_array(values, array.validity().cloned())
        });
        ChunkedArray::from_chunk_iter(self.name(), chunks)
    }
    fn apply<F>(&'a self, f: F) -> Self
    where
        F: Fn(&'a str) -> Cow<'a, str> + Copy,
    {
        use polars_arrow::array::utf8::Utf8FromIter;
        let chunks = self.downcast_iter().map(|arr| {
            let iter = arr.values_iter().map(f);
            let size_hint = (arr.get_values_size() as f64 * 1.3) as usize;
            let new = Utf8Array::<i64>::from_values_iter(iter, arr.len(), size_hint);
            new.with_validity(arr.validity().cloned())
        });
        Utf8Chunked::from_chunk_iter(self.name(), chunks)
    }
    fn try_apply<F>(&'a self, f: F) -> PolarsResult<Self>
    where
        F: Fn(&'a str) -> PolarsResult<Cow<'a, str>> + Copy,
    {
        try_apply!(self, f)
    }
    fn apply_on_opt<F>(&'a self, f: F) -> Self
    where
        F: Fn(Option<&'a str>) -> Option<Cow<'a, str>> + Copy,
    {
        let mut ca: Self = self.into_iter().map(f).collect_trusted();
        ca.rename(self.name());
        ca
    }
    fn apply_with_idx<F>(&'a self, f: F) -> Self
    where
        F: Fn((usize, &'a str)) -> Cow<'a, str> + Copy,
    {
        apply_enumerate!(self, f)
    }
    fn apply_with_idx_on_opt<F>(&'a self, f: F) -> Self
    where
        F: Fn((usize, Option<&'a str>)) -> Option<Cow<'a, str>> + Copy,
    {
        let mut ca: Self = self.into_iter().enumerate().map(f).collect_trusted();
        ca.rename(self.name());
        ca
    }
    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
    where
        F: Fn(Option<&'a str>, &T) -> T,
    {
        assert!(slice.len() >= self.len());
        let mut idx = 0;
        self.downcast_iter().for_each(|arr| {
            arr.into_iter().for_each(|opt_val| {
                let item = unsafe { slice.get_unchecked_mut(idx) };
                *item = f(opt_val, item);
                idx += 1;
            })
        });
    }
}
impl<'a> ChunkApply<'a, &'a [u8], Cow<'a, [u8]>> for BinaryChunked {
    fn apply_cast_numeric<F, S>(&'a self, f: F) -> ChunkedArray<S>
    where
        F: Fn(&'a [u8]) -> S::Native + Copy,
        S: PolarsNumericType,
    {
        let chunks = self.downcast_iter().map(|array| {
            let values = array.values_iter().map(f);
            collect_array(values, array.validity().cloned())
        });
        ChunkedArray::from_chunk_iter(self.name(), chunks)
    }
    fn branch_apply_cast_numeric_no_null<F, S>(&'a self, f: F) -> ChunkedArray<S>
    where
        F: Fn(Option<&'a [u8]>) -> S::Native + Copy,
        S: PolarsNumericType,
    {
        let chunks = self
            .downcast_iter()
            .map(|array| collect_array(array.into_iter().map(f), array.validity().cloned()));
        ChunkedArray::from_chunk_iter(self.name(), chunks)
    }
    fn apply<F>(&'a self, f: F) -> Self
    where
        F: Fn(&'a [u8]) -> Cow<'a, [u8]> + Copy,
    {
        apply!(self, f)
    }
    fn try_apply<F>(&'a self, f: F) -> PolarsResult<Self>
    where
        F: Fn(&'a [u8]) -> PolarsResult<Cow<'a, [u8]>> + Copy,
    {
        try_apply!(self, f)
    }
    fn apply_on_opt<F>(&'a self, f: F) -> Self
    where
        F: Fn(Option<&'a [u8]>) -> Option<Cow<'a, [u8]>> + Copy,
    {
        let mut ca: Self = self.into_iter().map(f).collect_trusted();
        ca.rename(self.name());
        ca
    }
    fn apply_with_idx<F>(&'a self, f: F) -> Self
    where
        F: Fn((usize, &'a [u8])) -> Cow<'a, [u8]> + Copy,
    {
        apply_enumerate!(self, f)
    }
    fn apply_with_idx_on_opt<F>(&'a self, f: F) -> Self
    where
        F: Fn((usize, Option<&'a [u8]>)) -> Option<Cow<'a, [u8]>> + Copy,
    {
        let mut ca: Self = self.into_iter().enumerate().map(f).collect_trusted();
        ca.rename(self.name());
        ca
    }
    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
    where
        F: Fn(Option<&'a [u8]>, &T) -> T,
    {
        assert!(slice.len() >= self.len());
        let mut idx = 0;
        self.downcast_iter().for_each(|arr| {
            arr.into_iter().for_each(|opt_val| {
                let item = unsafe { slice.get_unchecked_mut(idx) };
                *item = f(opt_val, item);
                idx += 1;
            })
        });
    }
}
impl ChunkApplyKernel<BooleanArray> for BooleanChunked {
    fn apply_kernel(&self, f: &dyn Fn(&BooleanArray) -> ArrayRef) -> Self {
        let chunks = self.downcast_iter().map(f).collect();
        unsafe { Self::from_chunks(self.name(), chunks) }
    }
    fn apply_kernel_cast<S>(&self, f: &dyn Fn(&BooleanArray) -> ArrayRef) -> ChunkedArray<S>
    where
        S: PolarsDataType,
    {
        let chunks = self.downcast_iter().map(f).collect();
        unsafe { ChunkedArray::<S>::from_chunks(self.name(), chunks) }
    }
}
impl<T> ChunkApplyKernel<PrimitiveArray<T::Native>> for ChunkedArray<T>
where
    T: PolarsNumericType,
{
    fn apply_kernel(&self, f: &dyn Fn(&PrimitiveArray<T::Native>) -> ArrayRef) -> Self {
        self.apply_kernel_cast(&f)
    }
    fn apply_kernel_cast<S>(
        &self,
        f: &dyn Fn(&PrimitiveArray<T::Native>) -> ArrayRef,
    ) -> ChunkedArray<S>
    where
        S: PolarsDataType,
    {
        let chunks = self.downcast_iter().map(f).collect();
        unsafe { ChunkedArray::from_chunks(self.name(), chunks) }
    }
}
impl ChunkApplyKernel<LargeStringArray> for Utf8Chunked {
    fn apply_kernel(&self, f: &dyn Fn(&LargeStringArray) -> ArrayRef) -> Self {
        self.apply_kernel_cast(&f)
    }
    fn apply_kernel_cast<S>(&self, f: &dyn Fn(&LargeStringArray) -> ArrayRef) -> ChunkedArray<S>
    where
        S: PolarsDataType,
    {
        let chunks = self.downcast_iter().map(f).collect();
        unsafe { ChunkedArray::from_chunks(self.name(), chunks) }
    }
}
impl ChunkApplyKernel<LargeBinaryArray> for BinaryChunked {
    fn apply_kernel(&self, f: &dyn Fn(&LargeBinaryArray) -> ArrayRef) -> Self {
        self.apply_kernel_cast(&f)
    }
    fn apply_kernel_cast<S>(&self, f: &dyn Fn(&LargeBinaryArray) -> ArrayRef) -> ChunkedArray<S>
    where
        S: PolarsDataType,
    {
        let chunks = self.downcast_iter().map(f).collect();
        unsafe { ChunkedArray::from_chunks(self.name(), chunks) }
    }
}
impl<'a> ChunkApply<'a, Series, Series> for ListChunked {
    fn apply_cast_numeric<F, S>(&self, f: F) -> ChunkedArray<S>
    where
        F: Fn(Series) -> S::Native + Copy,
        S: PolarsNumericType,
    {
        let dtype = self.inner_dtype();
        let chunks = self.downcast_iter().map(|array| unsafe {
            let values = array
                .values_iter()
                .map(|array| {
                    let series = Series::from_chunks_and_dtype_unchecked("", vec![array], &dtype);
                    f(series)
                })
                .trust_my_length(self.len());
            collect_array(values, array.validity().cloned())
        });
        ChunkedArray::from_chunk_iter(self.name(), chunks)
    }
    fn branch_apply_cast_numeric_no_null<F, S>(&self, f: F) -> ChunkedArray<S>
    where
        F: Fn(Option<Series>) -> S::Native + Copy,
        S: PolarsNumericType,
    {
        let dtype = self.inner_dtype();
        let chunks = self.downcast_iter().map(|array| unsafe {
            let values = array.iter().map(|x| {
                let x = x.map(|x| {
                    Series::from_chunks_and_dtype_unchecked("", vec![x], &dtype)
                });
                f(x)
            });
            let len = array.len();
            collect_array(values.trust_my_length(len), array.validity().cloned())
        });
        ChunkedArray::from_chunk_iter(self.name(), chunks)
    }
    fn apply<F>(&'a self, f: F) -> Self
    where
        F: Fn(Series) -> Series + Copy,
    {
        if self.is_empty() {
            return self.clone();
        }
        let mut fast_explode = true;
        let mut function = |s: Series| {
            let out = f(s);
            if out.is_empty() {
                fast_explode = false;
            }
            out
        };
        let mut ca: ListChunked = apply!(self, &mut function);
        if fast_explode {
            ca.set_fast_explode()
        }
        ca
    }
    fn try_apply<F>(&'a self, f: F) -> PolarsResult<Self>
    where
        F: Fn(Series) -> PolarsResult<Series> + Copy,
    {
        if self.is_empty() {
            return Ok(self.clone());
        }
        let mut fast_explode = true;
        let mut function = |s: Series| {
            let out = f(s);
            if let Ok(out) = &out {
                if out.is_empty() {
                    fast_explode = false;
                }
            }
            out
        };
        let ca: PolarsResult<ListChunked> = try_apply!(self, &mut function);
        let mut ca = ca?;
        if fast_explode {
            ca.set_fast_explode()
        }
        Ok(ca)
    }
    fn apply_on_opt<F>(&'a self, f: F) -> Self
    where
        F: Fn(Option<Series>) -> Option<Series> + Copy,
    {
        if self.is_empty() {
            return self.clone();
        }
        self.into_iter().map(f).collect_trusted()
    }
    fn apply_with_idx<F>(&'a self, f: F) -> Self
    where
        F: Fn((usize, Series)) -> Series + Copy,
    {
        if self.is_empty() {
            return self.clone();
        }
        let mut fast_explode = true;
        let mut function = |(idx, s)| {
            let out = f((idx, s));
            if out.is_empty() {
                fast_explode = false;
            }
            out
        };
        let mut ca: ListChunked = apply_enumerate!(self, function);
        if fast_explode {
            ca.set_fast_explode()
        }
        ca
    }
    fn apply_with_idx_on_opt<F>(&'a self, f: F) -> Self
    where
        F: Fn((usize, Option<Series>)) -> Option<Series> + Copy,
    {
        if self.is_empty() {
            return self.clone();
        }
        let mut fast_explode = true;
        let function = |(idx, s)| {
            let out = f((idx, s));
            if let Some(out) = &out {
                if out.is_empty() {
                    fast_explode = false;
                }
            }
            out
        };
        let mut ca: ListChunked = self.into_iter().enumerate().map(function).collect_trusted();
        if fast_explode {
            ca.set_fast_explode()
        }
        ca
    }
    fn apply_to_slice<F, T>(&'a self, f: F, slice: &mut [T])
    where
        F: Fn(Option<Series>, &T) -> T,
    {
        assert!(slice.len() >= self.len());
        let mut idx = 0;
        self.downcast_iter().for_each(|arr| {
            arr.iter().for_each(|opt_val| {
                let opt_val = opt_val.map(|arrayref| Series::try_from(("", arrayref)).unwrap());
                let item = unsafe { slice.get_unchecked_mut(idx) };
                *item = f(opt_val, item);
                idx += 1;
            })
        });
    }
}
#[cfg(feature = "object")]
impl<'a, T> ChunkApply<'a, &'a T, T> for ObjectChunked<T>
where
    T: PolarsObject,
{
    fn apply_cast_numeric<F, S>(&'a self, _f: F) -> ChunkedArray<S>
    where
        F: Fn(&'a T) -> S::Native + Copy,
        S: PolarsNumericType,
    {
        todo!()
    }
    fn branch_apply_cast_numeric_no_null<F, S>(&'a self, _f: F) -> ChunkedArray<S>
    where
        F: Fn(Option<&'a T>) -> S::Native + Copy,
        S: PolarsNumericType,
    {
        todo!()
    }
    fn apply<F>(&'a self, f: F) -> Self
    where
        F: Fn(&'a T) -> T + Copy,
    {
        let mut ca: ObjectChunked<T> = self.into_iter().map(|opt_v| opt_v.map(f)).collect();
        ca.rename(self.name());
        ca
    }
    fn try_apply<F>(&'a self, _f: F) -> PolarsResult<Self>
    where
        F: Fn(&'a T) -> PolarsResult<T> + Copy,
    {
        todo!()
    }
    fn apply_on_opt<F>(&'a self, f: F) -> Self
    where
        F: Fn(Option<&'a T>) -> Option<T> + Copy,
    {
        let mut ca: ObjectChunked<T> = self.into_iter().map(f).collect();
        ca.rename(self.name());
        ca
    }
    fn apply_with_idx<F>(&'a self, _f: F) -> Self
    where
        F: Fn((usize, &'a T)) -> T + Copy,
    {
        todo!()
    }
    fn apply_with_idx_on_opt<F>(&'a self, _f: F) -> Self
    where
        F: Fn((usize, Option<&'a T>)) -> Option<T> + Copy,
    {
        todo!()
    }
    fn apply_to_slice<F, V>(&'a self, f: F, slice: &mut [V])
    where
        F: Fn(Option<&'a T>, &V) -> V,
    {
        assert!(slice.len() >= self.len());
        let mut idx = 0;
        self.downcast_iter().for_each(|arr| {
            arr.into_iter().for_each(|opt_val| {
                let item = unsafe { slice.get_unchecked_mut(idx) };
                *item = f(opt_val, item);
                idx += 1;
            })
        });
    }
}