vortex_array/array/chunked/
canonical.rsuse arrow_buffer::{BooleanBufferBuilder, Buffer, MutableBuffer, ScalarBuffer};
use vortex_dtype::{DType, PType, StructDType};
use vortex_error::{vortex_bail, vortex_err, ErrString, VortexResult};
use crate::array::chunked::ChunkedArray;
use crate::array::extension::ExtensionArray;
use crate::array::null::NullArray;
use crate::array::primitive::PrimitiveArray;
use crate::array::struct_::StructArray;
use crate::array::{BinaryView, BoolArray, VarBinViewArray};
use crate::validity::Validity;
use crate::{
    Array, ArrayDType, ArrayValidity, Canonical, IntoArray, IntoArrayVariant, IntoCanonical,
};
impl IntoCanonical for ChunkedArray {
    fn into_canonical(self) -> VortexResult<Canonical> {
        let validity = if self.dtype().is_nullable() {
            self.logical_validity().into_validity()
        } else {
            Validity::NonNullable
        };
        try_canonicalize_chunks(self.chunks().collect(), validity, self.dtype())
    }
}
pub(crate) fn try_canonicalize_chunks(
    chunks: Vec<Array>,
    validity: Validity,
    dtype: &DType,
) -> VortexResult<Canonical> {
    let mismatched = chunks
        .iter()
        .filter(|chunk| !chunk.dtype().eq(dtype))
        .collect::<Vec<_>>();
    if !mismatched.is_empty() {
        vortex_bail!(MismatchedTypes: dtype.clone(), ErrString::from(format!("{:?}", mismatched)))
    }
    match dtype {
        DType::Struct(struct_dtype, _) => {
            let struct_array = swizzle_struct_chunks(chunks.as_slice(), validity, struct_dtype)?;
            Ok(Canonical::Struct(struct_array))
        }
        DType::Extension(ext_dtype) => {
            let storage_chunks: Vec<Array> = chunks
                .iter()
                .map(|chunk| chunk.clone().into_extension().map(|ext| ext.storage()))
                .collect::<VortexResult<Vec<Array>>>()?;
            let storage_dtype = ext_dtype.storage_dtype().clone();
            let chunked_storage =
                ChunkedArray::try_new(storage_chunks, storage_dtype)?.into_array();
            Ok(Canonical::Extension(ExtensionArray::new(
                ext_dtype.clone(),
                chunked_storage,
            )))
        }
        DType::List(..) => {
            todo!()
        }
        DType::Bool(_) => {
            let bool_array = pack_bools(chunks.as_slice(), validity)?;
            Ok(Canonical::Bool(bool_array))
        }
        DType::Primitive(ptype, _) => {
            let prim_array = pack_primitives(chunks.as_slice(), *ptype, validity)?;
            Ok(Canonical::Primitive(prim_array))
        }
        DType::Utf8(_) => {
            let varbin_array = pack_views(chunks.as_slice(), dtype, validity)?;
            Ok(Canonical::VarBinView(varbin_array))
        }
        DType::Binary(_) => {
            let varbin_array = pack_views(chunks.as_slice(), dtype, validity)?;
            Ok(Canonical::VarBinView(varbin_array))
        }
        DType::Null => {
            let len = chunks.iter().map(|chunk| chunk.len()).sum();
            let null_array = NullArray::new(len);
            Ok(Canonical::Null(null_array))
        }
    }
}
fn swizzle_struct_chunks(
    chunks: &[Array],
    validity: Validity,
    struct_dtype: &StructDType,
) -> VortexResult<StructArray> {
    let len = chunks.iter().map(|chunk| chunk.len()).sum();
    let mut field_arrays = Vec::new();
    for (field_idx, field_dtype) in struct_dtype.dtypes().iter().enumerate() {
        let field_chunks = chunks.iter().map(|c| c.with_dyn(|d|
            d.as_struct_array_unchecked()
                .field(field_idx)
                .ok_or_else(|| vortex_err!("All chunks must have same dtype; missing field at index {}, current chunk dtype: {}", field_idx, c.dtype())),
        )).collect::<VortexResult<Vec<_>>>()?;
        let field_array = ChunkedArray::try_new(field_chunks, field_dtype.clone())?;
        field_arrays.push(field_array.into_array());
    }
    StructArray::try_new(struct_dtype.names().clone(), field_arrays, len, validity)
}
fn pack_bools(chunks: &[Array], validity: Validity) -> VortexResult<BoolArray> {
    let len = chunks.iter().map(|chunk| chunk.len()).sum();
    let mut buffer = BooleanBufferBuilder::new(len);
    for chunk in chunks {
        let chunk = chunk.clone().into_bool()?;
        buffer.append_buffer(&chunk.boolean_buffer());
    }
    BoolArray::try_new(buffer.finish(), validity)
}
fn pack_primitives(
    chunks: &[Array],
    ptype: PType,
    validity: Validity,
) -> VortexResult<PrimitiveArray> {
    let len: usize = chunks.iter().map(|chunk| chunk.len()).sum();
    let mut buffer = MutableBuffer::with_capacity(len * ptype.byte_width());
    for chunk in chunks {
        let chunk = chunk.clone().into_primitive()?;
        buffer.extend_from_slice(chunk.buffer());
    }
    Ok(PrimitiveArray::new(
        Buffer::from(buffer).into(),
        ptype,
        validity,
    ))
}
fn pack_views(
    chunks: &[Array],
    dtype: &DType,
    validity: Validity,
) -> VortexResult<VarBinViewArray> {
    let total_len = chunks.iter().map(|a| a.len()).sum();
    let mut views: Vec<u128> = Vec::with_capacity(total_len);
    let mut buffers = Vec::new();
    for chunk in chunks {
        let buffers_offset = buffers.len();
        let canonical_chunk = chunk.clone().into_varbinview()?;
        for buffer in canonical_chunk.buffers() {
            let canonical_buffer = buffer.into_canonical()?.into_primitive()?.into_array();
            buffers.push(canonical_buffer);
        }
        for view in canonical_chunk.binary_views()? {
            if view.is_inlined() {
                views.push(view.as_u128());
            } else {
                let view_ref = view.as_view();
                views.push(
                    BinaryView::new_view(
                        view.len(),
                        *view_ref.prefix(),
                        (buffers_offset as u32) + view_ref.buffer_index(),
                        view_ref.offset(),
                    )
                    .as_u128(),
                );
            }
        }
    }
    let views_buffer: Buffer = ScalarBuffer::<u128>::from(views).into_inner();
    VarBinViewArray::try_new(Array::from(views_buffer), buffers, dtype.clone(), validity)
}
#[cfg(test)]
mod tests {
    use vortex_dtype::{DType, Nullability};
    use crate::accessor::ArrayAccessor;
    use crate::array::chunked::canonical::pack_views;
    use crate::array::{ChunkedArray, StructArray, VarBinViewArray};
    use crate::compute::slice;
    use crate::validity::Validity;
    use crate::variants::StructArrayTrait;
    use crate::{ArrayDType, IntoArray, IntoArrayVariant, ToArray};
    fn stringview_array() -> VarBinViewArray {
        VarBinViewArray::from_iter_str(["foo", "bar", "baz", "quak"])
    }
    #[test]
    pub fn pack_sliced_varbin() {
        let array1 = slice(stringview_array().as_ref(), 1, 3).unwrap();
        let array2 = slice(stringview_array().as_ref(), 2, 4).unwrap();
        let packed = pack_views(
            &[array1, array2],
            &DType::Utf8(Nullability::NonNullable),
            Validity::NonNullable,
        )
        .unwrap();
        assert_eq!(packed.len(), 4);
        let values = packed
            .with_iterator(|iter| {
                iter.flatten()
                    .map(|v| unsafe { String::from_utf8_unchecked(v.to_vec()) })
                    .collect::<Vec<_>>()
            })
            .unwrap();
        assert_eq!(values, &["bar", "baz", "baz", "quak"]);
    }
    #[test]
    pub fn pack_nested_structs() {
        let struct_array = StructArray::try_new(
            vec!["a".into()].into(),
            vec![stringview_array().into_array()],
            4,
            Validity::NonNullable,
        )
        .unwrap();
        let dtype = struct_array.dtype().clone();
        let chunked = ChunkedArray::try_new(
            vec![
                ChunkedArray::try_new(vec![struct_array.to_array()], dtype.clone())
                    .unwrap()
                    .into_array(),
            ],
            dtype,
        )
        .unwrap()
        .into_array();
        let canonical_struct = chunked.into_struct().unwrap();
        let canonical_varbin = canonical_struct
            .field(0)
            .unwrap()
            .into_varbinview()
            .unwrap();
        let original_varbin = struct_array.field(0).unwrap().into_varbinview().unwrap();
        let orig_values = original_varbin
            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
            .unwrap();
        let canon_values = canonical_varbin
            .with_iterator(|it| it.map(|a| a.map(|v| v.to_vec())).collect::<Vec<_>>())
            .unwrap();
        assert_eq!(orig_values, canon_values);
    }
}