vortex-array 0.62.0

Vortex in memory columnar data format
Documentation
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::sync::Arc;

use num_traits::AsPrimitive;
use vortex_error::VortexExpect;
use vortex_error::VortexResult;
use vortex_error::vortex_bail;
use vortex_error::vortex_ensure;
use vortex_error::vortex_panic;

use crate::ArrayRef;
use crate::DynArray;
use crate::IntoArray;
use crate::arrays::ConstantArray;
use crate::arrays::ListVTable;
use crate::arrays::PrimitiveVTable;
use crate::builtins::ArrayBuiltins;
use crate::compute::min_max;
use crate::dtype::DType;
use crate::dtype::NativePType;
use crate::match_each_integer_ptype;
use crate::match_each_native_ptype;
use crate::scalar_fn::fns::operators::Operator;
use crate::stats::ArrayStats;
use crate::validity::Validity;

/// A list array that stores variable-length lists of elements, similar to `Vec<Vec<T>>`.
///
/// This mirrors the Apache Arrow List array encoding and provides efficient storage
/// for nested data where each row contains a list of elements of the same type.
///
/// ## Data Layout
///
/// The list array uses an offset-based encoding:
/// - **Elements array**: A flat array containing all list elements concatenated together
/// - **Offsets array**: Integer array where `offsets[i]` is an (inclusive) start index into
///   the **elements** and `offsets[i+1]` is the (exclusive) stop index for the `i`th list.
/// - **Validity**: Optional mask indicating which lists are null
///
/// This allows for excellent cascading compression of the elements and offsets, as similar values
/// are clustered together and the offsets have a predictable pattern and small deltas between
/// consecutive elements.
///
/// ## Offset Semantics
///
/// - Offsets must be non-nullable integers (i32, i64, etc.)
/// - Offsets array has length `n+1` where `n` is the number of lists
/// - List `i` contains elements from `elements[offsets[i]..offsets[i+1]]`
/// - Offsets must be monotonically increasing
///
/// # Examples
///
/// ```
/// use vortex_array::arrays::{ListArray, PrimitiveArray};
/// use vortex_array::validity::Validity;
/// use vortex_array::IntoArray;
/// use vortex_buffer::buffer;
/// use std::sync::Arc;
///
/// // Create a list array representing [[1, 2], [3, 4, 5], []]
/// let elements = buffer![1i32, 2, 3, 4, 5].into_array();
/// let offsets = buffer![0u32, 2, 5, 5].into_array(); // 3 lists
///
/// let list_array = ListArray::try_new(
///     elements.into_array(),
///     offsets.into_array(),
///     Validity::NonNullable,
/// ).unwrap();
///
/// assert_eq!(list_array.len(), 3);
///
/// // Access individual lists
/// let first_list = list_array.list_elements_at(0).unwrap();
/// assert_eq!(first_list.len(), 2); // [1, 2]
///
/// let third_list = list_array.list_elements_at(2).unwrap();
/// assert!(third_list.is_empty()); // []
/// ```
#[derive(Clone, Debug)]
pub struct ListArray {
    pub(super) dtype: DType,
    pub(super) elements: ArrayRef,
    pub(super) offsets: ArrayRef,
    pub(super) validity: Validity,
    pub(super) stats_set: ArrayStats,
}

pub struct ListArrayParts {
    pub dtype: DType,
    pub elements: ArrayRef,
    pub offsets: ArrayRef,
    pub validity: Validity,
}

impl ListArray {
    /// Creates a new [`ListArray`].
    ///
    /// # Panics
    ///
    /// Panics if the provided components do not satisfy the invariants documented
    /// in [`ListArray::new_unchecked`].
    pub fn new(elements: ArrayRef, offsets: ArrayRef, validity: Validity) -> Self {
        Self::try_new(elements, offsets, validity).vortex_expect("ListArray new")
    }

    /// Constructs a new `ListArray`.
    ///
    /// See [`ListArray::new_unchecked`] for more information.
    ///
    /// # Errors
    ///
    /// Returns an error if the provided components do not satisfy the invariants documented in
    /// [`ListArray::new_unchecked`].
    pub fn try_new(
        elements: ArrayRef,
        offsets: ArrayRef,
        validity: Validity,
    ) -> VortexResult<Self> {
        Self::validate(&elements, &offsets, &validity)?;

        // SAFETY: validate ensures all invariants are met.
        Ok(unsafe { Self::new_unchecked(elements, offsets, validity) })
    }

    /// Creates a new [`ListArray`] without validation from these components:
    ///
    /// * `elements` is a flat array containing all list elements concatenated.
    /// * `offsets` is an integer array where `offsets[i]` is the start index for list `i`.
    /// * `validity` holds the null values.
    ///
    /// # Safety
    ///
    /// The caller must ensure all of the following invariants are satisfied:
    ///
    /// - Offsets must be a non-nullable integer array.
    /// - Offsets must have at least one element (even for empty lists, it should contain \[0\]).
    /// - Offsets must be sorted (monotonically increasing).
    /// - All offset values must be non-negative.
    /// - The maximum offset must not exceed `elements.len()`.
    /// - If validity is an array, its length must equal `offsets.len() - 1`.
    pub unsafe fn new_unchecked(elements: ArrayRef, offsets: ArrayRef, validity: Validity) -> Self {
        #[cfg(debug_assertions)]
        Self::validate(&elements, &offsets, &validity)
            .vortex_expect("[Debug Assertion]: Invalid `ListViewArray` parameters");

        Self {
            dtype: DType::List(Arc::new(elements.dtype().clone()), validity.nullability()),
            elements,
            offsets,
            validity,
            stats_set: Default::default(),
        }
    }

    /// Validates the components that would be used to create a [`ListArray`].
    ///
    /// This function checks all the invariants required by [`ListArray::new_unchecked`].
    pub fn validate(
        elements: &ArrayRef,
        offsets: &ArrayRef,
        validity: &Validity,
    ) -> VortexResult<()> {
        // Offsets must have at least one element
        vortex_ensure!(
            !offsets.is_empty(),
            InvalidArgument: "Offsets must have at least one element, [0] for an empty list"
        );

        // Offsets must be of integer type, and cannot go lower than 0.
        vortex_ensure!(
            offsets.dtype().is_int() && !offsets.dtype().is_nullable(),
            InvalidArgument: "offsets have invalid type {}",
            offsets.dtype()
        );

        // We can safely unwrap the DType as primitive now
        let offsets_ptype = offsets.dtype().as_ptype();

        // Offsets must be sorted (but not strictly sorted, zero-length lists are allowed)
        if let Some(is_sorted) = offsets.statistics().compute_is_sorted() {
            vortex_ensure!(is_sorted, InvalidArgument: "offsets must be sorted");
        } else {
            vortex_bail!(InvalidArgument: "offsets must report is_sorted statistic");
        }

        // Validate that offsets min is non-negative, and max does not exceed the length of
        // the elements array.
        if let Some(min_max) = min_max(offsets)? {
            match_each_integer_ptype!(offsets_ptype, |P| {
                #[allow(clippy::absurd_extreme_comparisons, unused_comparisons)]
                {
                    let max = min_max
                        .max
                        .as_primitive()
                        .as_::<P>()
                        .vortex_expect("offsets type must fit offsets values");
                    let min = min_max
                        .min
                        .as_primitive()
                        .as_::<P>()
                        .vortex_expect("offsets type must fit offsets values");

                    vortex_ensure!(
                        min >= 0,
                        InvalidArgument: "offsets minimum {min} outside valid range [0, {max}]"
                    );

                    vortex_ensure!(
                        max <= P::try_from(elements.len()).unwrap_or_else(|_| vortex_panic!(
                            "Offsets type {} must be able to fit elements length {}",
                            <P as NativePType>::PTYPE,
                            elements.len()
                        )),
                        InvalidArgument: "Max offset {max} is beyond the length of the elements array {}",
                        elements.len()
                    );
                }
            })
        } else {
            // TODO(aduffy): fallback to slower validation pathway?
            vortex_bail!(
                InvalidArgument: "offsets array with encoding {} must support min_max compute function",
                offsets.encoding_id()
            );
        };

        // If a validity array is present, it must be the same length as the ListArray
        if let Some(validity_len) = validity.maybe_len() {
            vortex_ensure!(
                validity_len == offsets.len() - 1,
                InvalidArgument: "validity with size {validity_len} does not match array size {}",
                offsets.len() - 1
            );
        }

        Ok(())
    }

    /// Splits an array into its parts
    pub fn into_parts(self) -> ListArrayParts {
        ListArrayParts {
            dtype: self.dtype,
            elements: self.elements,
            offsets: self.offsets,
            validity: self.validity,
        }
    }

    /// Returns the offset at the given index from the list array.
    ///
    /// Returns an error if the index is out of bounds or scalar_at fails.
    pub fn offset_at(&self, index: usize) -> VortexResult<usize> {
        vortex_ensure!(
            index <= self.len(),
            "Index {index} out of bounds 0..={}",
            self.len()
        );

        if let Some(p) = self.offsets().as_opt::<PrimitiveVTable>() {
            Ok(match_each_native_ptype!(p.ptype(), |P| {
                p.as_slice::<P>()[index].as_()
            }))
        } else {
            self.offsets()
                .scalar_at(index)?
                .as_primitive()
                .as_::<usize>()
                .ok_or_else(|| vortex_error::vortex_err!("offset value does not fit in usize"))
        }
    }

    /// Returns the elements of the list scalar at the given index of the list array.
    pub fn list_elements_at(&self, index: usize) -> VortexResult<ArrayRef> {
        let start = self.offset_at(index)?;
        let end = self.offset_at(index + 1)?;
        self.elements().slice(start..end)
    }

    /// Returns elements of the list array referenced by the offsets array.
    ///
    /// This is useful for discarding any potentially unused parts of the underlying `elements`
    /// child array.
    pub fn sliced_elements(&self) -> VortexResult<ArrayRef> {
        let start = self.offset_at(0)?;
        let end = self.offset_at(self.len())?;
        self.elements().slice(start..end)
    }

    /// Returns the offsets array.
    pub fn offsets(&self) -> &ArrayRef {
        &self.offsets
    }

    /// Returns the element dtype of the list array.
    pub fn element_dtype(&self) -> &Arc<DType> {
        match &self.dtype {
            DType::List(element_dtype, _) => element_dtype,
            _ => vortex_panic!("ListArray has invalid dtype {}", self.dtype),
        }
    }

    /// Returns the elements array.
    pub fn elements(&self) -> &ArrayRef {
        &self.elements
    }

    // TODO(connor)[ListView]: Create 2 functions `reset_offsets` and `recursive_reset_offsets`,
    // where `reset_offsets` is infallible.
    // Also, `reset_offsets` can be made more efficient by replacing `sub_scalar` with a match on
    // the offset type and manual subtraction and fast path where `offsets[0] == 0`.

    /// Create a copy of this array by adjusting `offsets` to start at `0` and removing elements not
    /// referenced by the `offsets`.
    pub fn reset_offsets(&self, recurse: bool) -> VortexResult<Self> {
        let mut elements = self.sliced_elements()?;
        if recurse && elements.is_canonical() {
            elements = elements.to_canonical()?.compact()?.into_array();
        } else if recurse && let Some(child_list_array) = elements.as_opt::<ListVTable>() {
            elements = child_list_array.reset_offsets(recurse)?.into_array();
        }

        let offsets = self.offsets();
        let first_offset = offsets.scalar_at(0)?;
        let adjusted_offsets = offsets.to_array().binary(
            ConstantArray::new(first_offset, offsets.len()).into_array(),
            Operator::Sub,
        )?;

        Self::try_new(elements, adjusted_offsets, self.validity.clone())
    }
}