vortex-array 0.23.0

Vortex in memory columnar data format
Documentation
use std::cmp::Ordering;
use std::ops::Deref;

use itertools::{Itertools, MinMaxResult};
use vortex_buffer::ByteBuffer;
use vortex_error::{vortex_panic, VortexResult};

use crate::accessor::ArrayAccessor;
use crate::array::varbin::VarBinArray;
use crate::array::{varbin_scalar, VarBinEncoding};
use crate::compute::scalar_at;
use crate::stats::{Stat, StatsSet};
use crate::vtable::StatisticsVTable;
use crate::Array;

impl StatisticsVTable<VarBinArray> for VarBinEncoding {
    fn compute_statistics(&self, array: &VarBinArray, stat: Stat) -> VortexResult<StatsSet> {
        compute_varbin_statistics(array, stat)
    }
}

pub fn compute_varbin_statistics<T: ArrayAccessor<[u8]> + Deref<Target = Array>>(
    array: &T,
    stat: Stat,
) -> VortexResult<StatsSet> {
    if stat == Stat::UncompressedSizeInBytes {
        return Ok(StatsSet::of(stat, array.nbytes()));
    }

    if array.is_empty()
        || stat == Stat::TrueCount
        || stat == Stat::RunCount
        || stat == Stat::BitWidthFreq
        || stat == Stat::TrailingZeroFreq
    {
        return Ok(StatsSet::default());
    }

    Ok(match stat {
        Stat::NullCount => {
            let null_count = array.logical_validity()?.false_count();
            if null_count == array.len() {
                return Ok(StatsSet::nulls(array.len(), array.dtype()));
            }

            let mut stats = StatsSet::of(Stat::NullCount, null_count);
            if null_count > 0 {
                // we know that there is at least one null, but not all nulls, so it's not constant
                stats.set(Stat::IsConstant, false);
            }
            stats
        }
        Stat::IsConstant => {
            let is_constant = array.with_iterator(compute_is_constant)?;
            if is_constant {
                // we know that the array is not empty
                StatsSet::constant(scalar_at(array.deref(), 0)?, array.len())
            } else {
                StatsSet::of(Stat::IsConstant, is_constant)
            }
        }
        Stat::Min | Stat::Max => compute_min_max(array)?,
        Stat::IsSorted => {
            let is_sorted = array.with_iterator(|iter| iter.flatten().is_sorted())?;
            let mut stats = StatsSet::of(Stat::IsSorted, is_sorted);
            if !is_sorted {
                stats.set(Stat::IsStrictSorted, false);
            }
            stats
        }
        Stat::IsStrictSorted => {
            let is_strict_sorted = array.with_iterator(|iter| {
                iter.flatten()
                    .is_sorted_by(|a, b| matches!(a.cmp(b), Ordering::Less))
            })?;
            let mut stats = StatsSet::of(Stat::IsStrictSorted, is_strict_sorted);
            if is_strict_sorted {
                stats.set(Stat::IsSorted, true);
            }
            stats
        }
        Stat::UncompressedSizeInBytes
        | Stat::TrueCount
        | Stat::RunCount
        | Stat::BitWidthFreq
        | Stat::TrailingZeroFreq => {
            vortex_panic!(
                "Unreachable, stat {} should have already been handled",
                stat
            )
        }
    })
}

fn compute_is_constant(iter: &mut dyn Iterator<Item = Option<&[u8]>>) -> bool {
    let Some(first_value) = iter.next() else {
        return true; // empty array is constant
    };
    for v in iter {
        if v != first_value {
            return false;
        }
    }
    true
}

fn compute_min_max<T: ArrayAccessor<[u8]>>(array: &T) -> VortexResult<StatsSet> {
    let mut stats = StatsSet::default();
    if array.is_empty() {
        return Ok(stats);
    }

    let minmax = array.with_iterator(|iter| match iter.flatten().minmax() {
        MinMaxResult::NoElements => None,
        MinMaxResult::OneElement(value) => {
            let scalar = ByteBuffer::from(value.to_vec());
            Some((scalar.clone(), scalar))
        }
        MinMaxResult::MinMax(min, max) => Some((
            ByteBuffer::from(min.to_vec()),
            ByteBuffer::from(max.to_vec()),
        )),
    })?;
    let Some((min, max)) = minmax else {
        // we know that the array is not empty, so it must be all nulls
        return Ok(StatsSet::nulls(array.len(), array.dtype()));
    };

    if min == max {
        // get (don't compute) null count if `min == max` to determine if it's constant
        if array.statistics().get_as::<u64>(Stat::NullCount) == Some(0) {
            // if there are no nulls, then the array is constant
            return Ok(StatsSet::constant(
                varbin_scalar(min, array.dtype()),
                array.len(),
            ));
        }
    } else {
        stats.set(Stat::IsConstant, false);
    }

    stats.set(Stat::Min, min);
    stats.set(Stat::Max, max);

    Ok(stats)
}

#[cfg(test)]
mod test {
    use std::ops::Deref;

    use vortex_buffer::{BufferString, ByteBuffer};
    use vortex_dtype::{DType, Nullability};

    use crate::array::varbin::VarBinArray;
    use crate::stats::Stat;

    fn array(dtype: DType) -> VarBinArray {
        VarBinArray::from_vec(
            vec!["hello world", "hello world this is a long string"],
            dtype,
        )
    }

    #[test]
    fn utf8_stats() {
        let arr = array(DType::Utf8(Nullability::NonNullable));
        assert_eq!(
            arr.statistics().compute_min::<BufferString>().unwrap(),
            BufferString::from("hello world".to_string())
        );
        assert_eq!(
            arr.statistics().compute_max::<BufferString>().unwrap(),
            BufferString::from("hello world this is a long string".to_string())
        );
        assert!(!arr.statistics().compute_is_constant().unwrap());
        assert!(arr.statistics().compute_is_sorted().unwrap());
    }

    #[test]
    fn binary_stats() {
        let arr = array(DType::Binary(Nullability::NonNullable));
        assert_eq!(
            arr.statistics()
                .compute_min::<ByteBuffer>()
                .unwrap()
                .deref(),
            b"hello world"
        );
        assert_eq!(
            arr.statistics()
                .compute_max::<ByteBuffer>()
                .unwrap()
                .deref(),
            "hello world this is a long string".as_bytes()
        );
        assert!(!arr.statistics().compute_is_constant().unwrap());
        assert!(arr.statistics().compute_is_sorted().unwrap());
    }

    #[test]
    fn some_nulls() {
        let array = VarBinArray::from_iter(
            vec![
                Some("hello world"),
                None,
                Some("hello world this is a long string"),
                None,
            ],
            DType::Utf8(Nullability::Nullable),
        );
        assert_eq!(
            array.statistics().compute_min::<BufferString>().unwrap(),
            BufferString::from("hello world".to_string())
        );
        assert_eq!(
            array.statistics().compute_max::<BufferString>().unwrap(),
            BufferString::from("hello world this is a long string".to_string())
        );
    }

    #[test]
    fn all_nulls() {
        let array = VarBinArray::from_iter(
            vec![Option::<&str>::None, None, None],
            DType::Utf8(Nullability::Nullable),
        );
        assert!(array.statistics().get(Stat::Min).is_none());
        assert!(array.statistics().get(Stat::Max).is_none());
    }
}