arrow-ord 52.1.0

Ordering kernels for arrow arrays
Documentation
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements.  See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership.  The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License.  You may obtain a copy of the License at
//
//   http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied.  See the License for the
// specific language governing permissions and limitations
// under the License.

//! Defines partition kernel for `ArrayRef`

use std::ops::Range;

use arrow_array::{Array, ArrayRef};
use arrow_buffer::BooleanBuffer;
use arrow_schema::ArrowError;

use crate::cmp::distinct;
use crate::sort::SortColumn;

/// A computed set of partitions, see [`partition`]
#[derive(Debug, Clone)]
pub struct Partitions(Option<BooleanBuffer>);

impl Partitions {
    /// Returns the range of each partition
    ///
    /// Consecutive ranges will be contiguous: i.e [`(a, b)` and `(b, c)`], and
    /// `start = 0` and `end = self.len()` for the first and last range respectively
    pub fn ranges(&self) -> Vec<Range<usize>> {
        let boundaries = match &self.0 {
            Some(boundaries) => boundaries,
            None => return vec![],
        };

        let mut out = vec![];
        let mut current = 0;
        for idx in boundaries.set_indices() {
            let t = current;
            current = idx + 1;
            out.push(t..current)
        }
        let last = boundaries.len() + 1;
        if current != last {
            out.push(current..last)
        }
        out
    }

    /// Returns the number of partitions
    pub fn len(&self) -> usize {
        match &self.0 {
            Some(b) => b.count_set_bits() + 1,
            None => 0,
        }
    }

    /// Returns true if this contains no partitions
    pub fn is_empty(&self) -> bool {
        self.0.is_none()
    }
}

/// Given a list of lexicographically sorted columns, computes the [`Partitions`],
/// where a partition consists of the set of consecutive rows with equal values
///
/// Returns an error if no columns are specified or all columns do not
/// have the same number of rows.
///
/// # Example:
///
/// For example, given columns `x`, `y` and `z`, calling
/// [`partition`]`(values, (x, y))` will divide the
/// rows into ranges where the values of `(x, y)` are equal:
///
/// ```text
/// ┌ ─ ┬───┬ ─ ─┌───┐─ ─ ┬───┬ ─ ─ ┐
///     │ 1 │    │ 1 │    │ A │        Range: 0..1 (x=1, y=1)
/// ├ ─ ┼───┼ ─ ─├───┤─ ─ ┼───┼ ─ ─ ┤
///     │ 1 │    │ 2 │    │ B │
/// │   ├───┤    ├───┤    ├───┤     │
///     │ 1 │    │ 2 │    │ C │        Range: 1..4 (x=1, y=2)
/// │   ├───┤    ├───┤    ├───┤     │
///     │ 1 │    │ 2 │    │ D │
/// ├ ─ ┼───┼ ─ ─├───┤─ ─ ┼───┼ ─ ─ ┤
///     │ 2 │    │ 1 │    │ E │        Range: 4..5 (x=2, y=1)
/// ├ ─ ┼───┼ ─ ─├───┤─ ─ ┼───┼ ─ ─ ┤
///     │ 3 │    │ 1 │    │ F │        Range: 5..6 (x=3, y=1)
/// └ ─ ┴───┴ ─ ─└───┘─ ─ ┴───┴ ─ ─ ┘
///
///       x        y        z     partition(&[x, y])
/// ```
///
/// # Example Code
///
/// ```
/// # use std::{sync::Arc, ops::Range};
/// # use arrow_array::{RecordBatch, Int64Array, StringArray, ArrayRef};
/// # use arrow_ord::sort::{SortColumn, SortOptions};
/// # use arrow_ord::partition::partition;
/// let batch = RecordBatch::try_from_iter(vec![
///     ("x", Arc::new(Int64Array::from(vec![1, 1, 1, 1, 2, 3])) as ArrayRef),
///     ("y", Arc::new(Int64Array::from(vec![1, 2, 2, 2, 1, 1])) as ArrayRef),
///     ("z", Arc::new(StringArray::from(vec!["A", "B", "C", "D", "E", "F"])) as ArrayRef),
/// ]).unwrap();
///
/// // Partition on first two columns
/// let ranges = partition(&batch.columns()[..2]).unwrap().ranges();
///
/// let expected = vec![
///     (0..1),
///     (1..4),
///     (4..5),
///     (5..6),
/// ];
///
/// assert_eq!(ranges, expected);
/// ```
pub fn partition(columns: &[ArrayRef]) -> Result<Partitions, ArrowError> {
    if columns.is_empty() {
        return Err(ArrowError::InvalidArgumentError(
            "Partition requires at least one column".to_string(),
        ));
    }
    let num_rows = columns[0].len();
    if columns.iter().any(|item| item.len() != num_rows) {
        return Err(ArrowError::InvalidArgumentError(
            "Partition columns have different row counts".to_string(),
        ));
    };

    match num_rows {
        0 => return Ok(Partitions(None)),
        1 => return Ok(Partitions(Some(BooleanBuffer::new_unset(0)))),
        _ => {}
    }

    let acc = find_boundaries(&columns[0])?;
    let acc = columns
        .iter()
        .skip(1)
        .try_fold(acc, |acc, c| find_boundaries(c.as_ref()).map(|b| &acc | &b))?;

    Ok(Partitions(Some(acc)))
}

/// Returns a mask with bits set whenever the value or nullability changes
fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
    let slice_len = v.len() - 1;
    let v1 = v.slice(0, slice_len);
    let v2 = v.slice(1, slice_len);
    Ok(distinct(&v1, &v2)?.values().clone())
}

/// Use [`partition`] instead. Given a list of already sorted columns, find
/// partition ranges that would partition lexicographically equal values across
/// columns.
///
/// The returned vec would be of size k where k is cardinality of the sorted values; Consecutive
/// values will be connected: (a, b) and (b, c), where start = 0 and end = n for the first and last
/// range.
#[deprecated(note = "Use partition")]
pub fn lexicographical_partition_ranges(
    columns: &[SortColumn],
) -> Result<impl Iterator<Item = Range<usize>> + '_, ArrowError> {
    let cols: Vec<_> = columns.iter().map(|x| x.values.clone()).collect();
    Ok(partition(&cols)?.ranges().into_iter())
}

#[cfg(test)]
mod tests {
    use std::sync::Arc;

    use arrow_array::*;
    use arrow_schema::DataType;

    use super::*;

    #[test]
    fn test_partition_empty() {
        let err = partition(&[]).unwrap_err();
        assert_eq!(
            err.to_string(),
            "Invalid argument error: Partition requires at least one column"
        );
    }

    #[test]
    fn test_partition_unaligned_rows() {
        let input = vec![
            Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
            Arc::new(StringArray::from(vec![Some("foo")])) as _,
        ];
        let err = partition(&input).unwrap_err();
        assert_eq!(
            err.to_string(),
            "Invalid argument error: Partition columns have different row counts"
        )
    }

    #[test]
    fn test_partition_small() {
        let results = partition(&[
            Arc::new(Int32Array::new(vec![].into(), None)) as _,
            Arc::new(Int32Array::new(vec![].into(), None)) as _,
            Arc::new(Int32Array::new(vec![].into(), None)) as _,
        ])
        .unwrap();
        assert_eq!(results.len(), 0);
        assert!(results.is_empty());

        let results = partition(&[
            Arc::new(Int32Array::from(vec![1])) as _,
            Arc::new(Int32Array::from(vec![1])) as _,
        ])
        .unwrap()
        .ranges();
        assert_eq!(results.len(), 1);
        assert_eq!(results[0], 0..1);
    }

    #[test]
    fn test_partition_single_column() {
        let a = Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]);
        let input = vec![Arc::new(a) as _];
        assert_eq!(
            partition(&input).unwrap().ranges(),
            vec![(0..1), (1..8), (8..9)],
        );
    }

    #[test]
    fn test_partition_all_equal_values() {
        let a = Int64Array::from_value(1, 1000);
        let input = vec![Arc::new(a) as _];
        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
    }

    #[test]
    fn test_partition_all_null_values() {
        let input = vec![
            new_null_array(&DataType::Int8, 1000),
            new_null_array(&DataType::UInt16, 1000),
        ];
        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
    }

    #[test]
    fn test_partition_unique_column_1() {
        let input = vec![
            Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
            Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])) as _,
        ];
        assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1), (1..2)],);
    }

    #[test]
    fn test_partition_unique_column_2() {
        let input = vec![
            Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)])) as _,
            Arc::new(StringArray::from(vec![
                Some("foo"),
                Some("bar"),
                Some("apple"),
            ])) as _,
        ];
        assert_eq!(
            partition(&input).unwrap().ranges(),
            vec![(0..1), (1..2), (2..3),],
        );
    }

    #[test]
    fn test_partition_non_unique_column_1() {
        let input = vec![
            Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1), Some(1)])) as _,
            Arc::new(StringArray::from(vec![
                Some("foo"),
                Some("bar"),
                Some("bar"),
                Some("bar"),
            ])) as _,
        ];
        assert_eq!(
            partition(&input).unwrap().ranges(),
            vec![(0..1), (1..3), (3..4),],
        );
    }

    #[test]
    fn test_partition_masked_nulls() {
        let input = vec![
            Arc::new(Int64Array::new(vec![1; 9].into(), None)) as _,
            Arc::new(Int64Array::new(
                vec![1, 1, 2, 2, 2, 3, 3, 3, 3].into(),
                Some(vec![false, true, true, true, true, false, false, true, false].into()),
            )) as _,
            Arc::new(Int64Array::new(
                vec![1, 1, 2, 2, 2, 2, 2, 3, 7].into(),
                Some(vec![true, true, true, true, false, true, true, true, false].into()),
            )) as _,
        ];

        assert_eq!(
            partition(&input).unwrap().ranges(),
            vec![(0..1), (1..2), (2..4), (4..5), (5..7), (7..8), (8..9)],
        );
    }
}