vortex-array 0.67.0

Vortex in memory columnar data format
Documentation
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

use std::ops::Range;

use vortex_error::VortexResult;

use crate::ArrayRef;
use crate::arrays::ChunkedArray;

pub(crate) struct AlignedPair {
    pub left: ArrayRef,
    pub right: ArrayRef,
    pub pos: Range<usize>,
}

/// Cursor over a chunk slice that maintains the invariant: `idx` always
/// points at a non-empty chunk or is past the end.
struct ChunkCursor<'a> {
    chunks: &'a [ArrayRef],
    idx: usize,
    offset: usize,
}

impl<'a> ChunkCursor<'a> {
    fn new(chunks: &'a [ArrayRef]) -> Self {
        let mut cursor = Self {
            chunks,
            idx: 0,
            offset: 0,
        };
        cursor.skip_empty();
        cursor
    }

    fn skip_empty(&mut self) {
        while self.idx < self.chunks.len()
            && unsafe { self.chunks.get_unchecked(self.idx) }.is_empty()
        {
            self.idx += 1;
        }
    }

    fn current_chunk(&self) -> Option<&'a ArrayRef> {
        (self.idx < self.chunks.len()).then(|| unsafe { self.chunks.get_unchecked(self.idx) })
    }

    fn remaining(&self, chunk: &ArrayRef) -> usize {
        chunk.len() - self.offset
    }

    fn take(&mut self, chunk: &ArrayRef, n: usize) -> VortexResult<ArrayRef> {
        let slice = chunk.slice(self.offset..self.offset + n)?;
        self.offset += n;
        if self.offset == chunk.len() {
            self.idx += 1;
            self.offset = 0;
            self.skip_empty();
        }
        Ok(slice)
    }
}

pub(crate) struct PairedChunks<'a> {
    left: ChunkCursor<'a>,
    right: ChunkCursor<'a>,
    pos: usize,
    total_len: usize,
}

impl ChunkedArray {
    pub(crate) fn paired_chunks<'a>(&'a self, other: &'a ChunkedArray) -> PairedChunks<'a> {
        assert_eq!(
            self.len(),
            other.len(),
            "paired_chunks requires arrays of equal length"
        );
        PairedChunks {
            left: ChunkCursor::new(&self.chunks),
            right: ChunkCursor::new(&other.chunks),
            pos: 0,
            total_len: self.len(),
        }
    }
}

impl Iterator for PairedChunks<'_> {
    type Item = VortexResult<AlignedPair>;

    fn next(&mut self) -> Option<Self::Item> {
        if self.pos >= self.total_len {
            return None;
        }

        let lhs_chunk = self.left.current_chunk()?;
        let rhs_chunk = self.right.current_chunk()?;

        let take = self
            .left
            .remaining(lhs_chunk)
            .min(self.right.remaining(rhs_chunk));

        let (lhs_slice, rhs_slice) = match self
            .left
            .take(lhs_chunk, take)
            .and_then(|l| self.right.take(rhs_chunk, take).map(|r| (l, r)))
        {
            Ok(pair) => pair,
            Err(e) => return Some(Err(e)),
        };

        let start = self.pos;
        self.pos += take;

        Some(Ok(AlignedPair {
            left: lhs_slice,
            right: rhs_slice,
            pos: start..self.pos,
        }))
    }
}

#[cfg(test)]
mod tests {
    use vortex_buffer::buffer;
    use vortex_error::VortexResult;

    use crate::IntoArray;
    use crate::arrays::ChunkedArray;
    use crate::dtype::DType;
    use crate::dtype::Nullability;
    use crate::dtype::PType;

    fn i32_dtype() -> DType {
        DType::Primitive(PType::I32, Nullability::NonNullable)
    }

    #[allow(clippy::type_complexity)]
    fn collect_pairs(
        left: &ChunkedArray,
        right: &ChunkedArray,
    ) -> VortexResult<Vec<(Vec<i32>, Vec<i32>, std::ops::Range<usize>)>> {
        use crate::ToCanonical;
        let mut result = Vec::new();
        for pair in left.paired_chunks(right) {
            let pair = pair?;
            let l: Vec<i32> = pair.left.to_primitive().as_slice::<i32>().to_vec();
            let r: Vec<i32> = pair.right.to_primitive().as_slice::<i32>().to_vec();
            result.push((l, r, pair.pos));
        }
        Ok(result)
    }

    #[test]
    fn test_aligned_chunks() -> VortexResult<()> {
        let left = ChunkedArray::try_new(
            vec![buffer![1i32, 2].into_array(), buffer![3i32, 4].into_array()],
            i32_dtype(),
        )?;
        let right = ChunkedArray::try_new(
            vec![
                buffer![10i32, 20].into_array(),
                buffer![30i32, 40].into_array(),
            ],
            i32_dtype(),
        )?;

        let pairs = collect_pairs(&left, &right)?;
        assert_eq!(pairs.len(), 2);
        assert_eq!(pairs[0], (vec![1, 2], vec![10, 20], 0..2));
        assert_eq!(pairs[1], (vec![3, 4], vec![30, 40], 2..4));
        Ok(())
    }

    #[test]
    fn test_misaligned_chunks() -> VortexResult<()> {
        let left = ChunkedArray::try_new(
            vec![
                buffer![1i32, 2].into_array(),
                buffer![3i32].into_array(),
                buffer![4i32, 5].into_array(),
            ],
            i32_dtype(),
        )?;
        let right = ChunkedArray::try_new(
            vec![
                buffer![10i32].into_array(),
                buffer![20i32, 30].into_array(),
                buffer![40i32, 50].into_array(),
            ],
            i32_dtype(),
        )?;

        let pairs = collect_pairs(&left, &right)?;
        assert_eq!(pairs.len(), 4);
        assert_eq!(pairs[0], (vec![1], vec![10], 0..1));
        assert_eq!(pairs[1], (vec![2], vec![20], 1..2));
        assert_eq!(pairs[2], (vec![3], vec![30], 2..3));
        assert_eq!(pairs[3], (vec![4, 5], vec![40, 50], 3..5));
        Ok(())
    }

    #[test]
    fn test_empty_chunks() -> VortexResult<()> {
        let left = ChunkedArray::try_new(
            vec![
                buffer![0i32; 0].into_array(),
                buffer![1i32, 2, 3].into_array(),
            ],
            i32_dtype(),
        )?;
        let right = ChunkedArray::try_new(
            vec![
                buffer![10i32, 20, 30].into_array(),
                buffer![0i32; 0].into_array(),
            ],
            i32_dtype(),
        )?;

        let pairs = collect_pairs(&left, &right)?;
        assert_eq!(pairs.len(), 1);
        assert_eq!(pairs[0], (vec![1, 2, 3], vec![10, 20, 30], 0..3));
        Ok(())
    }

    #[test]
    fn test_single_element_chunks() -> VortexResult<()> {
        let left = ChunkedArray::try_new(
            vec![
                buffer![1i32].into_array(),
                buffer![2i32].into_array(),
                buffer![3i32].into_array(),
            ],
            i32_dtype(),
        )?;
        let right = ChunkedArray::try_new(vec![buffer![10i32, 20, 30].into_array()], i32_dtype())?;

        let pairs = collect_pairs(&left, &right)?;
        assert_eq!(pairs.len(), 3);
        assert_eq!(pairs[0], (vec![1], vec![10], 0..1));
        assert_eq!(pairs[1], (vec![2], vec![20], 1..2));
        assert_eq!(pairs[2], (vec![3], vec![30], 2..3));
        Ok(())
    }

    #[test]
    fn test_both_empty() -> VortexResult<()> {
        let left = ChunkedArray::try_new(vec![], i32_dtype())?;
        let right = ChunkedArray::try_new(vec![], i32_dtype())?;

        let pairs = collect_pairs(&left, &right)?;
        assert!(pairs.is_empty());
        Ok(())
    }

    #[test]
    #[should_panic(expected = "paired_chunks requires arrays of equal length")]
    fn test_length_mismatch_panics() {
        let left = ChunkedArray::try_new(vec![buffer![1i32, 2].into_array()], i32_dtype()).unwrap();
        let right =
            ChunkedArray::try_new(vec![buffer![10i32, 20, 30].into_array()], i32_dtype()).unwrap();

        drop(left.paired_chunks(&right).collect::<Vec<_>>());
    }
}