use parquet2::encoding::hybrid_rle::encode_u32;
use parquet2::write::Version;
use crate::{
array::Offset,
bitmap::{utils::BitmapIter, Bitmap},
error::Result,
};
pub fn num_values<O: Offset>(offsets: &[O]) -> usize {
offsets
.windows(2)
.map(|w| {
let length = w[1].to_usize() - w[0].to_usize();
if length == 0 {
1
} else {
length
}
})
.sum()
}
#[derive(Debug)]
pub struct RepLevelsIter<'a, O: Offset> {
iter: std::slice::Windows<'a, O>,
remaining: usize,
length: usize,
total_size: usize,
}
impl<'a, O: Offset> RepLevelsIter<'a, O> {
pub fn new(offsets: &'a [O]) -> Self {
let total_size = num_values(offsets);
Self {
iter: offsets.windows(2),
remaining: 0,
length: 0,
total_size,
}
}
}
impl<O: Offset> Iterator for RepLevelsIter<'_, O> {
type Item = u32;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == self.length {
if let Some(w) = self.iter.next() {
let start = w[0].to_usize();
let end = w[1].to_usize();
self.length = end - start;
self.remaining = 0;
if self.length == 0 {
self.total_size -= 1;
return Some(0);
}
} else {
return None;
}
}
let old = self.remaining;
self.remaining += 1;
self.total_size -= 1;
Some((old >= 1) as u32)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.total_size, Some(self.total_size))
}
}
pub struct DefLevelsIter<'a, O: Offset> {
iter: std::iter::Zip<std::slice::Windows<'a, O>, Box<dyn Iterator<Item = bool> + 'a>>,
primitive_validity: Option<BitmapIter<'a>>,
remaining: usize,
is_valid: bool,
length: usize,
total_size: usize,
}
impl<'a, O: Offset> DefLevelsIter<'a, O> {
pub fn new(
offsets: &'a [O],
validity: Option<&'a Bitmap>,
primitive_validity: Option<&'a Bitmap>,
) -> Self {
let total_size = num_values(offsets);
let primitive_validity = primitive_validity.map(|x| x.iter());
let validity = validity
.map(|x| Box::new(x.iter()) as Box<dyn Iterator<Item = bool>>)
.unwrap_or_else(|| {
Box::new(std::iter::repeat(true).take(offsets.len() - 1))
as Box<dyn Iterator<Item = bool>>
});
Self {
iter: offsets.windows(2).zip(validity),
primitive_validity,
remaining: 0,
length: 0,
is_valid: false,
total_size,
}
}
}
impl<O: Offset> Iterator for DefLevelsIter<'_, O> {
type Item = u32;
fn next(&mut self) -> Option<Self::Item> {
if self.remaining == self.length {
if let Some((w, is_valid)) = self.iter.next() {
let start = w[0].to_usize();
let end = w[1].to_usize();
self.length = end - start;
self.remaining = 0;
self.is_valid = is_valid;
if self.length == 0 {
self.total_size -= 1;
return Some(is_valid as u32);
}
} else {
return None;
}
}
self.remaining += 1;
self.total_size -= 1;
let (base_def, p_is_valid) = self
.primitive_validity
.as_mut()
.map(|x| (1, x.next().unwrap() as u32))
.unwrap_or((0, 0));
let def_ = (base_def + 1) * self.is_valid as u32 + p_is_valid;
Some(def_)
}
fn size_hint(&self) -> (usize, Option<usize>) {
(self.total_size, Some(self.total_size))
}
}
#[derive(Debug)]
pub struct NestedInfo<'a, O: Offset> {
_is_optional: bool,
offsets: &'a [O],
validity: Option<&'a Bitmap>,
}
impl<'a, O: Offset> NestedInfo<'a, O> {
pub fn new(offsets: &'a [O], validity: Option<&'a Bitmap>, is_optional: bool) -> Self {
Self {
_is_optional: is_optional,
offsets,
validity,
}
}
pub fn offsets(&self) -> &'a [O] {
self.offsets
}
}
fn write_levels_v1<F: Fn(&mut Vec<u8>) -> Result<()>>(
buffer: &mut Vec<u8>,
encode: F,
) -> Result<()> {
buffer.extend_from_slice(&[0; 4]);
let start = buffer.len();
encode(buffer)?;
let end = buffer.len();
let length = end - start;
let length = (length as i32).to_le_bytes();
(0..4).for_each(|i| buffer[start - 4 + i] = length[i]);
Ok(())
}
pub fn write_rep_levels<O: Offset>(
buffer: &mut Vec<u8>,
nested: &NestedInfo<O>,
version: Version,
) -> Result<()> {
let num_bits = 1;
match version {
Version::V1 => {
write_levels_v1(buffer, |buffer: &mut Vec<u8>| {
let levels = RepLevelsIter::new(nested.offsets);
encode_u32(buffer, levels, num_bits)?;
Ok(())
})?;
}
Version::V2 => {
let levels = RepLevelsIter::new(nested.offsets);
encode_u32(buffer, levels, num_bits)?;
}
}
Ok(())
}
pub fn write_def_levels<O: Offset>(
buffer: &mut Vec<u8>,
nested: &NestedInfo<O>,
validity: Option<&Bitmap>,
version: Version,
) -> Result<()> {
let num_bits = 2;
match version {
Version::V1 => {
write_levels_v1(buffer, |buffer: &mut Vec<u8>| {
let levels = DefLevelsIter::new(nested.offsets, nested.validity, validity);
encode_u32(buffer, levels, num_bits)?;
Ok(())
})?;
}
Version::V2 => {
let levels = DefLevelsIter::new(nested.offsets, nested.validity, validity);
encode_u32(buffer, levels, num_bits)?;
}
}
Ok(())
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_rep_levels() {
let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref();
let expected = vec![0u32, 1, 0, 0, 1, 1, 0, 1, 1, 0, 0, 1, 1, 0, 0];
let result = RepLevelsIter::new(offsets).collect::<Vec<_>>();
assert_eq!(result, expected)
}
#[test]
fn test_def_levels() {
let offsets = [0, 2, 2, 5, 8, 8, 11, 11, 12].as_ref();
let validity = Some(Bitmap::from([
true, false, true, true, true, true, false, true,
]));
let primitive_validity = Some(Bitmap::from([
true, true, true, false, true, true, true, true, true, true, true, true, ]));
let expected = vec![3u32, 3, 0, 3, 2, 3, 3, 3, 3, 1, 3, 3, 3, 0, 3];
let result = DefLevelsIter::new(offsets, validity.as_ref(), primitive_validity.as_ref())
.collect::<Vec<_>>();
assert_eq!(result, expected)
}
}