use bytes::Bytes;
use crate::encodings::rle::RleDecoder;
use crate::errors::Result;
pub struct DictIndexDecoder {
decoder: RleDecoder,
index_buf: Box<[i32; 1024]>,
index_buf_len: usize,
index_offset: usize,
max_remaining_values: usize,
}
impl DictIndexDecoder {
pub fn new(data: Bytes, num_levels: usize, num_values: Option<usize>) -> Result<Self> {
let bit_width = data[0];
let mut decoder = RleDecoder::new(bit_width);
decoder.set_data(data.slice(1..))?;
Ok(Self {
decoder,
index_buf: Box::new([0; 1024]),
index_buf_len: 0,
index_offset: 0,
max_remaining_values: num_values.unwrap_or(num_levels),
})
}
#[inline(always)]
pub fn read<F: FnMut(&[i32]) -> Result<()>>(&mut self, len: usize, mut f: F) -> Result<usize> {
let total_to_read = len.min(self.max_remaining_values);
let mut values_read = 0;
let index_buf = self.index_buf.as_mut();
while values_read < total_to_read {
if self.index_offset == self.index_buf_len {
let read = self.decoder.get_batch(index_buf)?;
if read == 0 {
break;
}
self.index_buf_len = read;
self.index_offset = 0;
}
let available = self.index_buf_len - self.index_offset;
let n = available.min(total_to_read - values_read);
f(&index_buf[self.index_offset..self.index_offset + n])?;
self.index_offset += n;
values_read += n;
}
self.max_remaining_values -= values_read;
Ok(values_read)
}
pub fn skip(&mut self, to_skip: usize) -> Result<usize> {
let to_skip = to_skip.min(self.max_remaining_values);
let mut values_skip = 0;
while values_skip < to_skip {
if self.index_offset == self.index_buf_len {
let skip = self.decoder.skip(to_skip - values_skip)?;
if skip == 0 {
break;
}
self.max_remaining_values -= skip;
values_skip += skip;
} else {
let skip = (to_skip - values_skip).min(self.index_buf_len - self.index_offset);
self.index_offset += skip;
self.max_remaining_values -= skip;
values_skip += skip;
}
}
Ok(values_skip)
}
}