use std::ops::Range;
use arrow_array::{Array, ArrayRef};
use arrow_buffer::BooleanBuffer;
use arrow_schema::ArrowError;
use crate::cmp::distinct;
use crate::sort::SortColumn;
#[derive(Debug, Clone)]
pub struct Partitions(Option<BooleanBuffer>);
impl Partitions {
pub fn ranges(&self) -> Vec<Range<usize>> {
let boundaries = match &self.0 {
Some(boundaries) => boundaries,
None => return vec![],
};
let mut out = vec![];
let mut current = 0;
for idx in boundaries.set_indices() {
let t = current;
current = idx + 1;
out.push(t..current)
}
let last = boundaries.len() + 1;
if current != last {
out.push(current..last)
}
out
}
pub fn len(&self) -> usize {
match &self.0 {
Some(b) => b.count_set_bits() + 1,
None => 0,
}
}
pub fn is_empty(&self) -> bool {
self.0.is_none()
}
}
pub fn partition(columns: &[ArrayRef]) -> Result<Partitions, ArrowError> {
if columns.is_empty() {
return Err(ArrowError::InvalidArgumentError(
"Partition requires at least one column".to_string(),
));
}
let num_rows = columns[0].len();
if columns.iter().any(|item| item.len() != num_rows) {
return Err(ArrowError::InvalidArgumentError(
"Partition columns have different row counts".to_string(),
));
};
match num_rows {
0 => return Ok(Partitions(None)),
1 => return Ok(Partitions(Some(BooleanBuffer::new_unset(0)))),
_ => {}
}
let acc = find_boundaries(&columns[0])?;
let acc = columns
.iter()
.skip(1)
.try_fold(acc, |acc, c| find_boundaries(c.as_ref()).map(|b| &acc | &b))?;
Ok(Partitions(Some(acc)))
}
fn find_boundaries(v: &dyn Array) -> Result<BooleanBuffer, ArrowError> {
let slice_len = v.len() - 1;
let v1 = v.slice(0, slice_len);
let v2 = v.slice(1, slice_len);
Ok(distinct(&v1, &v2)?.values().clone())
}
#[deprecated(note = "Use partition")]
pub fn lexicographical_partition_ranges(
columns: &[SortColumn],
) -> Result<impl Iterator<Item = Range<usize>> + '_, ArrowError> {
let cols: Vec<_> = columns.iter().map(|x| x.values.clone()).collect();
Ok(partition(&cols)?.ranges().into_iter())
}
#[cfg(test)]
mod tests {
use std::sync::Arc;
use arrow_array::*;
use arrow_schema::DataType;
use super::*;
#[test]
fn test_partition_empty() {
let err = partition(&[]).unwrap_err();
assert_eq!(
err.to_string(),
"Invalid argument error: Partition requires at least one column"
);
}
#[test]
fn test_partition_unaligned_rows() {
let input = vec![
Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
Arc::new(StringArray::from(vec![Some("foo")])) as _,
];
let err = partition(&input).unwrap_err();
assert_eq!(
err.to_string(),
"Invalid argument error: Partition columns have different row counts"
)
}
#[test]
fn test_partition_small() {
let results = partition(&[
Arc::new(Int32Array::new(vec![].into(), None)) as _,
Arc::new(Int32Array::new(vec![].into(), None)) as _,
Arc::new(Int32Array::new(vec![].into(), None)) as _,
])
.unwrap();
assert_eq!(results.len(), 0);
assert!(results.is_empty());
let results = partition(&[
Arc::new(Int32Array::from(vec![1])) as _,
Arc::new(Int32Array::from(vec![1])) as _,
])
.unwrap()
.ranges();
assert_eq!(results.len(), 1);
assert_eq!(results[0], 0..1);
}
#[test]
fn test_partition_single_column() {
let a = Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]);
let input = vec![Arc::new(a) as _];
assert_eq!(
partition(&input).unwrap().ranges(),
vec![(0..1), (1..8), (8..9)],
);
}
#[test]
fn test_partition_all_equal_values() {
let a = Int64Array::from_value(1, 1000);
let input = vec![Arc::new(a) as _];
assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
}
#[test]
fn test_partition_all_null_values() {
let input = vec![
new_null_array(&DataType::Int8, 1000),
new_null_array(&DataType::UInt16, 1000),
];
assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1000)]);
}
#[test]
fn test_partition_unique_column_1() {
let input = vec![
Arc::new(Int64Array::from(vec![None, Some(-1)])) as _,
Arc::new(StringArray::from(vec![Some("foo"), Some("bar")])) as _,
];
assert_eq!(partition(&input).unwrap().ranges(), vec![(0..1), (1..2)],);
}
#[test]
fn test_partition_unique_column_2() {
let input = vec![
Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)])) as _,
Arc::new(StringArray::from(vec![
Some("foo"),
Some("bar"),
Some("apple"),
])) as _,
];
assert_eq!(
partition(&input).unwrap().ranges(),
vec![(0..1), (1..2), (2..3),],
);
}
#[test]
fn test_partition_non_unique_column_1() {
let input = vec![
Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1), Some(1)])) as _,
Arc::new(StringArray::from(vec![
Some("foo"),
Some("bar"),
Some("bar"),
Some("bar"),
])) as _,
];
assert_eq!(
partition(&input).unwrap().ranges(),
vec![(0..1), (1..3), (3..4),],
);
}
#[test]
fn test_partition_masked_nulls() {
let input = vec![
Arc::new(Int64Array::new(vec![1; 9].into(), None)) as _,
Arc::new(Int64Array::new(
vec![1, 1, 2, 2, 2, 3, 3, 3, 3].into(),
Some(vec![false, true, true, true, true, false, false, true, false].into()),
)) as _,
Arc::new(Int64Array::new(
vec![1, 1, 2, 2, 2, 2, 2, 3, 7].into(),
Some(vec![true, true, true, true, false, true, true, true, false].into()),
)) as _,
];
assert_eq!(
partition(&input).unwrap().ranges(),
vec![(0..1), (1..2), (2..4), (4..5), (5..7), (7..8), (8..9)],
);
}
}