use std::fmt::Display;
use std::fmt::Formatter;
use std::sync::Arc;
use num_traits::AsPrimitive;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_panic;
use crate::ArrayRef;
use crate::IntoArray;
use crate::LEGACY_SESSION;
use crate::VortexSessionExecute;
use crate::aggregate_fn::fns::min_max::min_max;
use crate::array::Array;
use crate::array::ArrayParts;
use crate::array::TypedArrayRef;
use crate::array::child_to_validity;
use crate::array::validity_to_child;
use crate::arrays::ConstantArray;
use crate::arrays::List;
use crate::arrays::Primitive;
use crate::builtins::ArrayBuiltins;
use crate::dtype::DType;
use crate::dtype::NativePType;
use crate::match_each_integer_ptype;
use crate::match_each_native_ptype;
use crate::scalar_fn::fns::operators::Operator;
use crate::validity::Validity;
pub(super) const ELEMENTS_SLOT: usize = 0;
pub(super) const OFFSETS_SLOT: usize = 1;
pub(super) const VALIDITY_SLOT: usize = 2;
pub(super) const NUM_SLOTS: usize = 3;
pub(super) const SLOT_NAMES: [&str; NUM_SLOTS] = ["elements", "offsets", "validity"];
#[derive(Clone, Debug, Default)]
pub struct ListData;
impl Display for ListData {
fn fmt(&self, _f: &mut Formatter<'_>) -> std::fmt::Result {
Ok(())
}
}
pub struct ListDataParts {
pub elements: ArrayRef,
pub offsets: ArrayRef,
pub validity: Validity,
pub dtype: DType,
}
impl ListData {
pub(crate) fn make_slots(
elements: &ArrayRef,
offsets: &ArrayRef,
validity: &Validity,
len: usize,
) -> Vec<Option<ArrayRef>> {
vec![
Some(elements.clone()),
Some(offsets.clone()),
validity_to_child(validity, len),
]
}
pub fn build(elements: ArrayRef, offsets: ArrayRef, validity: Validity) -> Self {
Self::try_build(elements, offsets, validity).vortex_expect("ListArray new")
}
pub(crate) fn try_build(
elements: ArrayRef,
offsets: ArrayRef,
validity: Validity,
) -> VortexResult<Self> {
Self::validate(&elements, &offsets, &validity)?;
Ok(unsafe { Self::new_unchecked() })
}
pub unsafe fn new_unchecked() -> Self {
Self
}
pub fn validate(
elements: &ArrayRef,
offsets: &ArrayRef,
validity: &Validity,
) -> VortexResult<()> {
vortex_ensure!(
!offsets.is_empty(),
InvalidArgument: "Offsets must have at least one element, [0] for an empty list"
);
vortex_ensure!(
offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
InvalidArgument: "offsets have invalid type {}",
offsets.dtype()
);
let offsets_ptype = offsets.dtype().as_ptype();
if let Some(is_sorted) = offsets.statistics().compute_is_sorted() {
vortex_ensure!(is_sorted, InvalidArgument: "offsets must be sorted");
} else {
vortex_bail!(InvalidArgument: "offsets must report is_sorted statistic");
}
let mut ctx = LEGACY_SESSION.create_execution_ctx();
if let Some(min_max) = min_max(offsets, &mut ctx)? {
match_each_integer_ptype!(offsets_ptype, |P| {
#[allow(clippy::absurd_extreme_comparisons, unused_comparisons)]
{
let max = min_max
.max
.as_primitive()
.as_::<P>()
.vortex_expect("offsets type must fit offsets values");
let min = min_max
.min
.as_primitive()
.as_::<P>()
.vortex_expect("offsets type must fit offsets values");
vortex_ensure!(
min >= 0,
InvalidArgument: "offsets minimum {min} outside valid range [0, {max}]"
);
vortex_ensure!(
max <= P::try_from(elements.len()).unwrap_or_else(|_| vortex_panic!(
"Offsets type {} must be able to fit elements length {}",
<P as NativePType>::PTYPE,
elements.len()
)),
InvalidArgument: "Max offset {max} is beyond the length of the elements array {}",
elements.len()
);
}
})
} else {
vortex_bail!(
InvalidArgument: "offsets array with encoding {} must support min_max compute function",
offsets.encoding_id()
);
};
if let Some(validity_len) = validity.maybe_len() {
vortex_ensure!(
validity_len == offsets.len() - 1,
InvalidArgument: "validity with size {validity_len} does not match array size {}",
offsets.len() - 1
);
}
Ok(())
}
}
pub trait ListArrayExt: TypedArrayRef<List> {
fn nullability(&self) -> crate::dtype::Nullability {
match self.as_ref().dtype() {
DType::List(_, nullability) => *nullability,
_ => unreachable!("ListArrayExt requires a list dtype"),
}
}
fn elements(&self) -> &ArrayRef {
self.as_ref().slots()[ELEMENTS_SLOT]
.as_ref()
.vortex_expect("ListArray elements slot")
}
fn offsets(&self) -> &ArrayRef {
self.as_ref().slots()[OFFSETS_SLOT]
.as_ref()
.vortex_expect("ListArray offsets slot")
}
fn list_validity(&self) -> Validity {
child_to_validity(&self.as_ref().slots()[VALIDITY_SLOT], self.nullability())
}
fn list_validity_mask(&self) -> vortex_mask::Mask {
self.list_validity().to_mask(self.as_ref().len())
}
fn offset_at(&self, index: usize) -> VortexResult<usize> {
vortex_ensure!(
index <= self.as_ref().len(),
"Index {index} out of bounds 0..={}",
self.as_ref().len()
);
if let Some(p) = self.offsets().as_opt::<Primitive>() {
Ok(match_each_native_ptype!(p.ptype(), |P| {
p.as_slice::<P>()[index].as_()
}))
} else {
self.offsets()
.scalar_at(index)?
.as_primitive()
.as_::<usize>()
.ok_or_else(|| vortex_error::vortex_err!("offset value does not fit in usize"))
}
}
fn list_elements_at(&self, index: usize) -> VortexResult<ArrayRef> {
let start = self.offset_at(index)?;
let end = self.offset_at(index + 1)?;
self.elements().slice(start..end)
}
fn sliced_elements(&self) -> VortexResult<ArrayRef> {
let start = self.offset_at(0)?;
let end = self.offset_at(self.as_ref().len())?;
self.elements().slice(start..end)
}
fn element_dtype(&self) -> &DType {
self.elements().dtype()
}
fn reset_offsets(&self, recurse: bool) -> VortexResult<Array<List>> {
let mut elements = self.sliced_elements()?;
if recurse && elements.is_canonical() {
elements = elements.to_canonical()?.compact()?.into_array();
} else if recurse && let Some(child_list_array) = elements.as_opt::<List>() {
elements = child_list_array
.into_owned()
.reset_offsets(recurse)?
.into_array();
}
let offsets = self.offsets();
let first_offset = offsets.scalar_at(0)?;
let adjusted_offsets = offsets.clone().binary(
ConstantArray::new(first_offset, offsets.len()).into_array(),
Operator::Sub,
)?;
Array::<List>::try_new(elements, adjusted_offsets, self.list_validity())
}
}
impl<T: TypedArrayRef<List>> ListArrayExt for T {}
impl Array<List> {
pub fn new(elements: ArrayRef, offsets: ArrayRef, validity: Validity) -> Self {
let dtype = DType::List(Arc::new(elements.dtype().clone()), validity.nullability());
let len = offsets.len().saturating_sub(1);
let slots = ListData::make_slots(&elements, &offsets, &validity, len);
let data = ListData::build(elements, offsets, validity);
unsafe {
Array::from_parts_unchecked(ArrayParts::new(List, dtype, len, data).with_slots(slots))
}
}
pub fn try_new(
elements: ArrayRef,
offsets: ArrayRef,
validity: Validity,
) -> VortexResult<Self> {
let dtype = DType::List(Arc::new(elements.dtype().clone()), validity.nullability());
let len = offsets.len().saturating_sub(1);
let slots = ListData::make_slots(&elements, &offsets, &validity, len);
let data = ListData::try_build(elements, offsets, validity)?;
Ok(unsafe {
Array::from_parts_unchecked(ArrayParts::new(List, dtype, len, data).with_slots(slots))
})
}
pub unsafe fn new_unchecked(elements: ArrayRef, offsets: ArrayRef, validity: Validity) -> Self {
let dtype = DType::List(Arc::new(elements.dtype().clone()), validity.nullability());
let len = offsets.len().saturating_sub(1);
let slots = ListData::make_slots(&elements, &offsets, &validity, len);
let data = unsafe { ListData::new_unchecked() };
unsafe {
Array::from_parts_unchecked(ArrayParts::new(List, dtype, len, data).with_slots(slots))
}
}
pub fn into_data_parts(self) -> ListDataParts {
let dtype = self.dtype().clone();
let elements = self.slots()[ELEMENTS_SLOT]
.clone()
.vortex_expect("ListArray elements slot");
let offsets = self.slots()[OFFSETS_SLOT]
.clone()
.vortex_expect("ListArray offsets slot");
let validity = self.list_validity();
ListDataParts {
elements,
offsets,
validity,
dtype,
}
}
}