use crate::sort::{LexicographicalComparator, SortColumn};
use arrow_schema::ArrowError;
use std::cmp::Ordering;
use std::ops::Range;
pub fn lexicographical_partition_ranges(
columns: &[SortColumn],
) -> Result<impl Iterator<Item = Range<usize>> + '_, ArrowError> {
LexicographicalPartitionIterator::try_new(columns)
}
struct LexicographicalPartitionIterator<'a> {
comparator: LexicographicalComparator<'a>,
num_rows: usize,
previous_partition_point: usize,
partition_point: usize,
}
impl<'a> LexicographicalPartitionIterator<'a> {
fn try_new(
columns: &'a [SortColumn],
) -> Result<LexicographicalPartitionIterator, ArrowError> {
if columns.is_empty() {
return Err(ArrowError::InvalidArgumentError(
"Sort requires at least one column".to_string(),
));
}
let num_rows = columns[0].values.len();
if columns.iter().any(|item| item.values.len() != num_rows) {
return Err(ArrowError::ComputeError(
"Lexical sort columns have different row counts".to_string(),
));
};
let comparator = LexicographicalComparator::try_new(columns)?;
Ok(LexicographicalPartitionIterator {
comparator,
num_rows,
previous_partition_point: 0,
partition_point: 0,
})
}
}
#[inline]
fn exponential_search_next_partition_point(
start: usize,
end: usize,
comparator: &LexicographicalComparator<'_>,
) -> usize {
let target = start;
let mut bound = 1;
while bound + start < end
&& comparator.compare(bound + start, target) != Ordering::Greater
{
bound *= 2;
}
partition_point(start + bound / 2, end.min(start + bound + 1), |idx| {
comparator.compare(idx, target) != Ordering::Greater
})
}
#[inline]
fn partition_point<P: Fn(usize) -> bool>(start: usize, end: usize, pred: P) -> usize {
let mut left = start;
let mut right = end;
let mut size = right - left;
while left < right {
let mid = left + size / 2;
let less = pred(mid);
if less {
left = mid + 1;
} else {
right = mid;
}
size = right - left;
}
left
}
impl<'a> Iterator for LexicographicalPartitionIterator<'a> {
type Item = Range<usize>;
fn next(&mut self) -> Option<Self::Item> {
if self.partition_point < self.num_rows {
self.partition_point = exponential_search_next_partition_point(
self.partition_point,
self.num_rows,
&self.comparator,
);
let start = self.previous_partition_point;
let end = self.partition_point;
self.previous_partition_point = self.partition_point;
Some(Range { start, end })
} else {
None
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::sort::SortOptions;
use arrow_array::*;
use arrow_schema::DataType;
use std::sync::Arc;
#[test]
fn test_partition_point() {
let input = &[1, 1, 1, 2, 2, 2, 2, 2, 2, 3, 3, 3, 4];
{
let median = input[input.len() / 2];
assert_eq!(
9,
partition_point(0, input.len(), |i: usize| input[i].cmp(&median)
!= Ordering::Greater)
);
}
{
let search = input[9];
assert_eq!(
12,
partition_point(9, input.len(), |i: usize| input[i].cmp(&search)
!= Ordering::Greater)
);
}
{
let search = input[0];
assert_eq!(
3,
partition_point(0, 9, |i: usize| input[i].cmp(&search)
!= Ordering::Greater)
);
}
let input = &[1, 2, 2, 2, 2, 2, 2, 2, 9];
{
let search = input[5];
assert_eq!(
8,
partition_point(5, 9, |i: usize| input[i].cmp(&search)
!= Ordering::Greater)
);
}
}
#[test]
fn test_lexicographical_partition_ranges_empty() {
let input = vec![];
assert!(
lexicographical_partition_ranges(&input).is_err(),
"lexicographical_partition_ranges should reject columns with empty rows"
);
}
#[test]
fn test_lexicographical_partition_ranges_unaligned_rows() {
let input = vec![
SortColumn {
values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as ArrayRef,
options: None,
},
SortColumn {
values: Arc::new(StringArray::from(vec![Some("foo")])) as ArrayRef,
options: None,
},
];
assert!(
lexicographical_partition_ranges(&input).is_err(),
"lexicographical_partition_ranges should reject columns with different row counts"
);
}
#[test]
fn test_lexicographical_partition_single_column() {
let input = vec![SortColumn {
values: Arc::new(Int64Array::from(vec![1, 2, 2, 2, 2, 2, 2, 2, 9]))
as ArrayRef,
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
}];
let results = lexicographical_partition_ranges(&input).unwrap();
assert_eq!(
vec![(0_usize..1_usize), (1_usize..8_usize), (8_usize..9_usize)],
results.collect::<Vec<_>>()
);
}
#[test]
fn test_lexicographical_partition_all_equal_values() {
let input = vec![SortColumn {
values: Arc::new(Int64Array::from_value(1, 1000)) as ArrayRef,
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
}];
let results = lexicographical_partition_ranges(&input).unwrap();
assert_eq!(vec![(0_usize..1000_usize)], results.collect::<Vec<_>>());
}
#[test]
fn test_lexicographical_partition_all_null_values() {
let input = vec![
SortColumn {
values: new_null_array(&DataType::Int8, 1000),
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
},
SortColumn {
values: new_null_array(&DataType::UInt16, 1000),
options: Some(SortOptions {
descending: false,
nulls_first: false,
}),
},
];
let results = lexicographical_partition_ranges(&input).unwrap();
assert_eq!(vec![(0_usize..1000_usize)], results.collect::<Vec<_>>());
}
#[test]
fn test_lexicographical_partition_unique_column_1() {
let input = vec![
SortColumn {
values: Arc::new(Int64Array::from(vec![None, Some(-1)])) as ArrayRef,
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
},
SortColumn {
values: Arc::new(StringArray::from(vec![Some("foo"), Some("bar")]))
as ArrayRef,
options: Some(SortOptions {
descending: true,
nulls_first: true,
}),
},
];
let results = lexicographical_partition_ranges(&input).unwrap();
assert_eq!(
vec![(0_usize..1_usize), (1_usize..2_usize)],
results.collect::<Vec<_>>()
);
}
#[test]
fn test_lexicographical_partition_unique_column_2() {
let input = vec![
SortColumn {
values: Arc::new(Int64Array::from(vec![None, Some(-1), Some(-1)]))
as ArrayRef,
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
},
SortColumn {
values: Arc::new(StringArray::from(vec![
Some("foo"),
Some("bar"),
Some("apple"),
])) as ArrayRef,
options: Some(SortOptions {
descending: true,
nulls_first: true,
}),
},
];
let results = lexicographical_partition_ranges(&input).unwrap();
assert_eq!(
vec![(0_usize..1_usize), (1_usize..2_usize), (2_usize..3_usize),],
results.collect::<Vec<_>>()
);
}
#[test]
fn test_lexicographical_partition_non_unique_column_1() {
let input = vec![
SortColumn {
values: Arc::new(Int64Array::from(vec![
None,
Some(-1),
Some(-1),
Some(1),
])) as ArrayRef,
options: Some(SortOptions {
descending: false,
nulls_first: true,
}),
},
SortColumn {
values: Arc::new(StringArray::from(vec![
Some("foo"),
Some("bar"),
Some("bar"),
Some("bar"),
])) as ArrayRef,
options: Some(SortOptions {
descending: true,
nulls_first: true,
}),
},
];
let results = lexicographical_partition_ranges(&input).unwrap();
assert_eq!(
vec![(0_usize..1_usize), (1_usize..3_usize), (3_usize..4_usize),],
results.collect::<Vec<_>>()
);
}
}