use crate::ArrowNativeType;
use crate::buffer::ScalarBuffer;
#[derive(Debug, Clone)]
pub struct RunEndBuffer<E: ArrowNativeType> {
run_ends: ScalarBuffer<E>,
logical_length: usize,
logical_offset: usize,
}
impl<E> RunEndBuffer<E>
where
E: ArrowNativeType,
{
pub fn new(run_ends: ScalarBuffer<E>, logical_offset: usize, logical_length: usize) -> Self {
assert!(
run_ends.windows(2).all(|w| w[0] < w[1]),
"run-ends not strictly increasing"
);
if logical_length != 0 {
assert!(!run_ends.is_empty(), "non-empty slice but empty run-ends");
let end = E::from_usize(logical_offset.saturating_add(logical_length)).unwrap();
assert!(
*run_ends.first().unwrap() > E::usize_as(0),
"run-ends not greater than 0"
);
assert!(
*run_ends.last().unwrap() >= end,
"slice beyond bounds of run-ends"
);
}
Self {
run_ends,
logical_offset,
logical_length,
}
}
pub unsafe fn new_unchecked(
run_ends: ScalarBuffer<E>,
logical_offset: usize,
logical_length: usize,
) -> Self {
Self {
run_ends,
logical_offset,
logical_length,
}
}
#[inline]
pub fn offset(&self) -> usize {
self.logical_offset
}
#[inline]
pub fn len(&self) -> usize {
self.logical_length
}
#[inline]
pub fn is_empty(&self) -> bool {
self.logical_length == 0
}
pub fn shrink_to_fit(&mut self) {
self.run_ends.shrink_to_fit();
}
#[inline]
pub fn values(&self) -> &[E] {
&self.run_ends
}
pub fn sliced_values(&self) -> impl Iterator<Item = E> + '_ {
let offset = self.logical_offset;
let len = self.logical_length;
let physical_slice = if self.is_empty() {
&self.run_ends[0..0]
} else {
let start = self.get_start_physical_index();
let end = self.get_end_physical_index();
&self.run_ends[start..=end]
};
physical_slice.iter().map(move |&val| {
let val = val.as_usize().saturating_sub(offset).min(len);
E::from_usize(val).unwrap()
})
}
#[inline]
pub fn max_value(&self) -> usize {
self.values().last().copied().unwrap_or_default().as_usize()
}
pub fn get_physical_index(&self, logical_index: usize) -> usize {
let logical_index = E::usize_as(self.logical_offset + logical_index);
let cmp = |p: &E| p.partial_cmp(&logical_index).unwrap();
match self.run_ends.binary_search_by(cmp) {
Ok(idx) => idx + 1,
Err(idx) => idx,
}
}
pub fn get_start_physical_index(&self) -> usize {
if self.logical_offset == 0 || self.logical_length == 0 {
return 0;
}
self.get_physical_index(0)
}
pub fn get_end_physical_index(&self) -> usize {
if self.logical_length == 0 {
return 0;
}
if self.max_value() == self.logical_offset + self.logical_length {
return self.values().len() - 1;
}
self.get_physical_index(self.logical_length - 1)
}
pub fn slice(&self, logical_offset: usize, logical_length: usize) -> Self {
assert!(
logical_offset.saturating_add(logical_length) <= self.logical_length,
"the length + offset of the sliced RunEndBuffer cannot exceed the existing length"
);
Self {
run_ends: self.run_ends.clone(),
logical_offset: self.logical_offset + logical_offset,
logical_length,
}
}
pub fn inner(&self) -> &ScalarBuffer<E> {
&self.run_ends
}
pub fn into_inner(self) -> ScalarBuffer<E> {
self.run_ends
}
#[inline]
pub fn get_physical_indices<I>(&self, logical_indices: &[I]) -> Result<Vec<usize>, I>
where
I: ArrowNativeType,
{
let len = self.len();
let offset = self.offset();
let indices_len = logical_indices.len();
if indices_len == 0 {
return Ok(vec![]);
}
let mut ordered_indices: Vec<usize> = (0..indices_len).collect();
ordered_indices.sort_unstable_by(|lhs, rhs| {
logical_indices[*lhs]
.partial_cmp(&logical_indices[*rhs])
.unwrap()
});
let largest_logical_index = logical_indices[*ordered_indices.last().unwrap()].as_usize();
if largest_logical_index >= len {
return Err(logical_indices[*ordered_indices.last().unwrap()]);
}
let skip_value = self.get_start_physical_index();
let mut physical_indices = vec![0; indices_len];
let mut ordered_index = 0_usize;
for (physical_index, run_end) in self.values().iter().enumerate().skip(skip_value) {
let run_end_value = run_end.as_usize() - offset;
while ordered_index < indices_len
&& logical_indices[ordered_indices[ordered_index]].as_usize() < run_end_value
{
physical_indices[ordered_indices[ordered_index]] = physical_index;
ordered_index += 1;
}
}
if ordered_index < logical_indices.len() {
return Err(logical_indices[ordered_indices[ordered_index]]);
}
Ok(physical_indices)
}
}
#[cfg(test)]
mod tests {
use crate::buffer::RunEndBuffer;
#[test]
fn test_zero_length_slice() {
let buffer = RunEndBuffer::new(vec![1_i32, 4_i32].into(), 0, 4);
assert_eq!(buffer.get_start_physical_index(), 0);
assert_eq!(buffer.get_end_physical_index(), 1);
assert_eq!(buffer.get_physical_index(3), 1);
for offset in 0..4 {
let sliced = buffer.slice(offset, 0);
assert_eq!(sliced.get_start_physical_index(), 0);
assert_eq!(sliced.get_end_physical_index(), 0);
}
let buffer = RunEndBuffer::new(Vec::<i32>::new().into(), 0, 0);
assert_eq!(buffer.get_start_physical_index(), 0);
assert_eq!(buffer.get_end_physical_index(), 0);
}
#[test]
fn test_sliced_values() {
let buffer = RunEndBuffer::new(vec![2i32, 3, 6].into(), 0, 6);
let sliced = buffer.slice(1, 4);
let sliced_values: Vec<i32> = sliced.sliced_values().collect();
assert_eq!(sliced_values, &[1, 2, 4]);
let sliced = buffer.slice(4, 2);
let sliced_values: Vec<i32> = sliced.sliced_values().collect();
assert_eq!(sliced_values, &[2]);
}
}