use std::{iter::Zip, sync::Arc};
use arrow_array::OffsetSizeTrait;
use arrow_buffer::{
ArrowNativeType, BooleanBuffer, BooleanBufferBuilder, NullBuffer, OffsetBuffer, ScalarBuffer,
};
use lance_core::{utils::bit::log_2_ceil, Error, Result};
use snafu::{location, Location};
pub type LevelBuffer = Vec<u16>;
#[derive(Clone, Debug)]
enum RawRepDef {
Offsets(Arc<[i64]>),
Validity(BooleanBuffer),
NoNull(usize),
}
#[derive(Debug)]
pub struct SerializedRepDefs {
pub repetition_levels: Option<LevelBuffer>,
pub definition_levels: Option<LevelBuffer>,
}
impl SerializedRepDefs {
pub fn empty() -> Self {
Self {
repetition_levels: None,
definition_levels: None,
}
}
}
struct SerializerContext {
last_offsets: Option<Arc<[i64]>>,
rep_levels: LevelBuffer,
def_levels: LevelBuffer,
current_rep: u16,
current_def: u16,
has_nulls: bool,
}
impl SerializerContext {
fn new(len: usize, has_nulls: bool) -> Self {
Self {
last_offsets: None,
rep_levels: LevelBuffer::with_capacity(len),
def_levels: if has_nulls {
LevelBuffer::with_capacity(len)
} else {
LevelBuffer::default()
},
current_rep: 1,
current_def: 1,
has_nulls: false,
}
}
fn record_all_valid(&mut self, len: usize) {
self.current_def += 1;
if self.def_levels.is_empty() {
self.def_levels.resize(len, 0);
}
}
fn record_offsets(&mut self, offsets: &Arc<[i64]>) {
let rep_level = self.current_rep;
self.current_rep += 1;
if let Some(last_offsets) = &self.last_offsets {
let mut new_last_off = Vec::with_capacity(offsets.len());
for off in offsets[..offsets.len() - 1].iter() {
let offset_ctx = last_offsets[*off as usize];
new_last_off.push(offset_ctx);
self.rep_levels[offset_ctx as usize] = rep_level;
}
self.last_offsets = Some(new_last_off.into());
} else {
self.rep_levels.resize(*offsets.last().unwrap() as usize, 0);
for off in offsets[..offsets.len() - 1].iter() {
self.rep_levels[*off as usize] = rep_level;
}
self.last_offsets = Some(offsets.clone());
}
}
fn record_validity(&mut self, validity: &BooleanBuffer) {
self.has_nulls = true;
let def_level = self.current_def;
self.current_def += 1;
if self.def_levels.is_empty() {
self.def_levels.resize(validity.len(), 0);
}
if let Some(last_offsets) = &self.last_offsets {
last_offsets
.windows(2)
.zip(validity.iter())
.for_each(|(w, valid)| {
if !valid {
self.def_levels[w[0] as usize..w[1] as usize].fill(def_level);
}
});
} else {
self.def_levels
.iter_mut()
.zip(validity.iter())
.for_each(|(def, valid)| {
if !valid {
*def = def_level;
}
});
}
}
fn build(self) -> SerializedRepDefs {
SerializedRepDefs {
definition_levels: if self.has_nulls {
Some(self.def_levels)
} else {
None
},
repetition_levels: if self.current_rep > 1 {
Some(self.rep_levels)
} else {
None
},
}
}
}
#[derive(Clone, Default)]
pub struct RepDefBuilder {
repdefs: Vec<RawRepDef>,
len: Option<usize>,
}
impl RepDefBuilder {
fn check_validity_len(&mut self, validity: &NullBuffer) {
if let Some(len) = self.len {
assert!(validity.len() == len);
}
self.len = Some(validity.len());
}
fn num_layers(&self) -> usize {
self.repdefs.len()
}
fn is_empty(&self) -> bool {
self.repdefs
.iter()
.all(|r| matches!(r, RawRepDef::NoNull(_)))
}
pub fn is_simple_validity(&self) -> bool {
self.repdefs.len() == 1 && matches!(self.repdefs[0], RawRepDef::Validity(_))
}
pub fn has_nulls(&self) -> bool {
self.repdefs
.iter()
.any(|rd| matches!(rd, RawRepDef::Validity(_)))
}
pub fn add_validity_bitmap(&mut self, validity: NullBuffer) {
self.check_validity_len(&validity);
self.repdefs
.push(RawRepDef::Validity(validity.into_inner()));
}
pub fn add_no_null(&mut self, len: usize) {
self.repdefs.push(RawRepDef::NoNull(len));
}
fn check_offset_len(&mut self, offsets: &[i64]) {
if let Some(len) = self.len {
assert!(offsets.len() == len + 1);
}
self.len = Some(offsets[offsets.len() - 1] as usize);
}
pub fn add_offsets<O: OffsetSizeTrait>(&mut self, repetition: OffsetBuffer<O>) {
if O::IS_LARGE {
let inner = repetition.into_inner();
let len = inner.len();
let i64_buff = ScalarBuffer::new(inner.into_inner(), 0, len);
let offsets = Vec::from(i64_buff);
self.check_offset_len(&offsets);
self.repdefs.push(RawRepDef::Offsets(offsets.into()));
} else {
let inner = repetition.into_inner();
let len = inner.len();
let casted = ScalarBuffer::<i32>::new(inner.into_inner(), 0, len)
.iter()
.copied()
.map(|o| o as i64)
.collect::<Vec<_>>();
self.check_offset_len(&casted);
self.repdefs.push(RawRepDef::Offsets(casted.into()));
}
}
fn concat_layers<'a>(mut layers: impl Iterator<Item = &'a RawRepDef>, len: usize) -> RawRepDef {
let first = layers.next().unwrap();
match &first {
RawRepDef::NoNull(_) | RawRepDef::Validity(_) => {
let mut has_nulls = false;
let mut builder = BooleanBufferBuilder::new(len);
for layer in std::iter::once(first).chain(layers) {
match layer {
RawRepDef::NoNull(num_valid) => {
builder.append_n(*num_valid, true);
}
RawRepDef::Validity(validity) => {
has_nulls = true;
builder.append_buffer(validity);
}
_ => unreachable!(),
}
}
if has_nulls {
RawRepDef::Validity(builder.finish())
} else {
RawRepDef::NoNull(builder.len())
}
}
RawRepDef::Offsets(offsets) => {
let mut all_offsets = Vec::with_capacity(len);
all_offsets.extend(offsets.iter().copied());
for layer in layers {
let last = *all_offsets.last().unwrap();
let RawRepDef::Offsets(offsets) = layer else {
unreachable!()
};
all_offsets.extend(offsets.iter().skip(1).map(|off| *off + last));
}
RawRepDef::Offsets(all_offsets.into())
}
}
}
pub fn serialize(builders: Vec<Self>) -> SerializedRepDefs {
if builders.is_empty() {
return SerializedRepDefs::empty();
}
if builders.iter().all(|b| b.is_empty()) {
return SerializedRepDefs::empty();
}
let has_nulls = builders.iter().any(|b| b.has_nulls());
let total_len = builders.iter().map(|b| b.len.unwrap()).sum();
let mut context = SerializerContext::new(total_len, has_nulls);
debug_assert!(builders
.iter()
.all(|b| b.num_layers() == builders[0].num_layers()));
for layer_index in (0..builders[0].num_layers()).rev() {
let layer =
Self::concat_layers(builders.iter().map(|b| &b.repdefs[layer_index]), total_len);
match layer {
RawRepDef::Validity(def) => {
context.record_validity(&def);
}
RawRepDef::Offsets(rep) => {
context.record_offsets(&rep);
}
RawRepDef::NoNull(len) => {
context.record_all_valid(len);
}
}
}
context.build()
}
}
#[derive(Debug)]
pub struct RepDefUnraveler {
rep_levels: Option<LevelBuffer>,
def_levels: Option<LevelBuffer>,
current_def_cmp: u16,
}
impl RepDefUnraveler {
pub fn new(rep_levels: Option<LevelBuffer>, def_levels: Option<LevelBuffer>) -> Self {
Self {
rep_levels,
def_levels,
current_def_cmp: 0,
}
}
pub fn unravel_offsets<T: ArrowNativeType>(&mut self) -> Result<OffsetBuffer<T>> {
let rep_levels = self
.rep_levels
.as_mut()
.expect("Expected repetition level but data didn't contain repetition");
let mut offsets: Vec<T> = Vec::with_capacity(rep_levels.len() + 1);
let mut curlen: usize = 0;
let to_offset = |val: usize| {
T::from_usize(val)
.ok_or_else(|| Error::invalid_input("A single batch had more than i32::MAX values and so a large container type is required", location!()))
};
if let Some(def_levels) = &mut self.def_levels {
assert!(rep_levels.len() == def_levels.len());
let mut read_idx = 0;
let mut write_idx = 0;
while read_idx < rep_levels.len() {
unsafe {
let rep_val = *rep_levels.get_unchecked(read_idx);
if rep_val != 0 {
offsets.push(to_offset(curlen)?);
*rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
*def_levels.get_unchecked_mut(write_idx) =
*def_levels.get_unchecked(read_idx);
write_idx += 1;
}
curlen += 1;
read_idx += 1;
}
}
offsets.push(to_offset(curlen)?);
rep_levels.truncate(offsets.len() - 1);
def_levels.truncate(offsets.len() - 1);
Ok(OffsetBuffer::new(ScalarBuffer::from(offsets)))
} else {
let mut read_idx = 0;
let mut write_idx = 0;
while read_idx < rep_levels.len() {
unsafe {
let rep_val = *rep_levels.get_unchecked(read_idx);
if rep_val != 0 {
offsets.push(to_offset(curlen)?);
*rep_levels.get_unchecked_mut(write_idx) = rep_val - 1;
write_idx += 1;
}
curlen += 1;
read_idx += 1;
}
}
offsets.push(to_offset(curlen)?);
rep_levels.truncate(offsets.len() - 1);
Ok(OffsetBuffer::new(ScalarBuffer::from(offsets)))
}
}
pub fn unravel_validity(&mut self) -> Option<NullBuffer> {
let Some(def_levels) = &self.def_levels else {
return None;
};
let current_def_cmp = self.current_def_cmp;
self.current_def_cmp += 1;
let validity = BooleanBuffer::from_iter(def_levels.iter().map(|&r| r <= current_def_cmp));
if validity.count_set_bits() == validity.len() {
None
} else {
Some(NullBuffer::new(validity))
}
}
}
#[derive(Debug)]
pub struct BinaryControlWordIterator<I: Iterator<Item = (u16, u16)>, W> {
repdef: I,
def_width: usize,
rep_mask: u16,
def_mask: u16,
bits_rep: u8,
bits_def: u8,
phantom: std::marker::PhantomData<W>,
}
impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u8> {
fn append_next(&mut self, buf: &mut Vec<u8>) {
let next = self.repdef.next().unwrap();
let control_word: u8 =
(((next.0 & self.rep_mask) as u8) << self.def_width) + ((next.1 & self.def_mask) as u8);
buf.push(control_word);
}
}
impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u16> {
fn append_next(&mut self, buf: &mut Vec<u8>) {
let next = self.repdef.next().unwrap();
let control_word: u16 =
((next.0 & self.rep_mask) << self.def_width) + (next.1 & self.def_mask);
let control_word = control_word.to_le_bytes();
buf.push(control_word[0]);
buf.push(control_word[1]);
}
}
impl<I: Iterator<Item = (u16, u16)>> BinaryControlWordIterator<I, u32> {
fn append_next(&mut self, buf: &mut Vec<u8>) {
let next = self.repdef.next().unwrap();
let control_word: u32 = (((next.0 & self.rep_mask) as u32) << self.def_width)
+ ((next.1 & self.def_mask) as u32);
let control_word = control_word.to_le_bytes();
buf.push(control_word[0]);
buf.push(control_word[1]);
buf.push(control_word[2]);
buf.push(control_word[3]);
}
}
#[derive(Debug)]
pub struct UnaryControlWordIterator<I: Iterator<Item = u16>, W> {
repdef: I,
level_mask: u16,
bits_rep: u8,
bits_def: u8,
phantom: std::marker::PhantomData<W>,
}
impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u8> {
fn append_next(&mut self, buf: &mut Vec<u8>) {
let next = self.repdef.next().unwrap();
buf.push((next & self.level_mask) as u8);
}
}
impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u16> {
fn append_next(&mut self, buf: &mut Vec<u8>) {
let next = self.repdef.next().unwrap() & self.level_mask;
let control_word = next.to_le_bytes();
buf.push(control_word[0]);
buf.push(control_word[1]);
}
}
impl<I: Iterator<Item = u16>> UnaryControlWordIterator<I, u32> {
fn append_next(&mut self, buf: &mut Vec<u8>) {
let next = (self.repdef.next().unwrap() & self.level_mask) as u32;
let control_word = next.to_le_bytes();
buf.push(control_word[0]);
buf.push(control_word[1]);
buf.push(control_word[2]);
buf.push(control_word[3]);
}
}
#[derive(Debug)]
pub struct NilaryControlWordIterator;
fn get_mask(width: u16) -> u16 {
(1 << width) - 1
}
#[derive(Debug)]
pub enum ControlWordIterator {
Binary8(BinaryControlWordIterator<Zip<std::vec::IntoIter<u16>, std::vec::IntoIter<u16>>, u8>),
Binary16(BinaryControlWordIterator<Zip<std::vec::IntoIter<u16>, std::vec::IntoIter<u16>>, u16>),
Binary32(BinaryControlWordIterator<Zip<std::vec::IntoIter<u16>, std::vec::IntoIter<u16>>, u32>),
Unary8(UnaryControlWordIterator<std::vec::IntoIter<u16>, u8>),
Unary16(UnaryControlWordIterator<std::vec::IntoIter<u16>, u16>),
Unary32(UnaryControlWordIterator<std::vec::IntoIter<u16>, u32>),
Nilary(NilaryControlWordIterator),
}
impl ControlWordIterator {
pub fn append_next(&mut self, buf: &mut Vec<u8>) {
match self {
Self::Binary8(iter) => iter.append_next(buf),
Self::Binary16(iter) => iter.append_next(buf),
Self::Binary32(iter) => iter.append_next(buf),
Self::Unary8(iter) => iter.append_next(buf),
Self::Unary16(iter) => iter.append_next(buf),
Self::Unary32(iter) => iter.append_next(buf),
Self::Nilary(_) => {}
}
}
pub fn bytes_per_word(&self) -> usize {
match self {
Self::Binary8(_) => 1,
Self::Binary16(_) => 2,
Self::Binary32(_) => 4,
Self::Unary8(_) => 1,
Self::Unary16(_) => 2,
Self::Unary32(_) => 4,
Self::Nilary(_) => 0,
}
}
pub fn bits_rep(&self) -> u8 {
match self {
Self::Binary8(iter) => iter.bits_rep,
Self::Binary16(iter) => iter.bits_rep,
Self::Binary32(iter) => iter.bits_rep,
Self::Unary8(iter) => iter.bits_rep,
Self::Unary16(iter) => iter.bits_rep,
Self::Unary32(iter) => iter.bits_rep,
Self::Nilary(_) => 0,
}
}
pub fn bits_def(&self) -> u8 {
match self {
Self::Binary8(iter) => iter.bits_def,
Self::Binary16(iter) => iter.bits_def,
Self::Binary32(iter) => iter.bits_def,
Self::Unary8(iter) => iter.bits_def,
Self::Unary16(iter) => iter.bits_def,
Self::Unary32(iter) => iter.bits_def,
Self::Nilary(_) => 0,
}
}
}
pub fn build_control_word_iterator(
rep: Option<Vec<u16>>,
max_rep: u16,
def: Option<Vec<u16>>,
max_def: u16,
) -> ControlWordIterator {
let rep_width = if max_rep == 0 {
0
} else {
log_2_ceil(max_rep as u32) as u16
};
let rep_mask = if max_rep == 0 { 0 } else { get_mask(rep_width) };
let def_width = if max_def == 0 {
0
} else {
log_2_ceil(max_def as u32) as u16
};
let def_mask = if max_def == 0 { 0 } else { get_mask(def_width) };
let total_width = rep_width + def_width;
match (rep, def) {
(Some(rep), Some(def)) => {
let iter = rep.into_iter().zip(def);
let def_width = def_width as usize;
if total_width <= 8 {
ControlWordIterator::Binary8(BinaryControlWordIterator {
repdef: iter,
rep_mask,
def_mask,
def_width,
bits_rep: rep_width as u8,
bits_def: def_width as u8,
phantom: std::marker::PhantomData,
})
} else if total_width <= 16 {
ControlWordIterator::Binary16(BinaryControlWordIterator {
repdef: iter,
rep_mask,
def_mask,
def_width,
bits_rep: rep_width as u8,
bits_def: def_width as u8,
phantom: std::marker::PhantomData,
})
} else {
ControlWordIterator::Binary32(BinaryControlWordIterator {
repdef: iter,
rep_mask,
def_mask,
def_width,
bits_rep: rep_width as u8,
bits_def: def_width as u8,
phantom: std::marker::PhantomData,
})
}
}
(Some(lev), None) => {
let iter = lev.into_iter();
if total_width <= 8 {
ControlWordIterator::Unary8(UnaryControlWordIterator {
repdef: iter,
level_mask: rep_mask,
bits_rep: total_width as u8,
bits_def: 0,
phantom: std::marker::PhantomData,
})
} else if total_width <= 16 {
ControlWordIterator::Unary16(UnaryControlWordIterator {
repdef: iter,
level_mask: rep_mask,
bits_rep: total_width as u8,
bits_def: 0,
phantom: std::marker::PhantomData,
})
} else {
ControlWordIterator::Unary32(UnaryControlWordIterator {
repdef: iter,
level_mask: rep_mask,
bits_rep: total_width as u8,
bits_def: 0,
phantom: std::marker::PhantomData,
})
}
}
(None, Some(lev)) => {
let iter = lev.into_iter();
if total_width <= 8 {
ControlWordIterator::Unary8(UnaryControlWordIterator {
repdef: iter,
level_mask: def_mask,
bits_rep: 0,
bits_def: total_width as u8,
phantom: std::marker::PhantomData,
})
} else if total_width <= 16 {
ControlWordIterator::Unary16(UnaryControlWordIterator {
repdef: iter,
level_mask: def_mask,
bits_rep: 0,
bits_def: total_width as u8,
phantom: std::marker::PhantomData,
})
} else {
ControlWordIterator::Unary32(UnaryControlWordIterator {
repdef: iter,
level_mask: def_mask,
bits_rep: 0,
bits_def: total_width as u8,
phantom: std::marker::PhantomData,
})
}
}
(None, None) => ControlWordIterator::Nilary(NilaryControlWordIterator {}),
}
}
#[derive(Copy, Clone, Debug)]
pub enum ControlWordParser {
BOTH8(u8, u32),
BOTH16(u8, u32),
BOTH32(u8, u32),
REP8,
REP16,
REP32,
DEF8,
DEF16,
DEF32,
NIL,
}
impl ControlWordParser {
fn parse_both<const WORD_SIZE: u8>(
src: &[u8],
dst_rep: &mut Vec<u16>,
dst_def: &mut Vec<u16>,
bits_to_shift: u8,
mask_to_apply: u32,
) {
match WORD_SIZE {
1 => {
let word = src[0];
let rep = word >> bits_to_shift;
let def = word & (mask_to_apply as u8);
dst_rep.push(rep as u16);
dst_def.push(def as u16);
}
2 => {
let word = u16::from_le_bytes([src[0], src[1]]);
let rep = word >> bits_to_shift;
let def = word & mask_to_apply as u16;
dst_rep.push(rep);
dst_def.push(def);
}
4 => {
let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
let rep = word >> bits_to_shift;
let def = word & mask_to_apply;
dst_rep.push(rep as u16);
dst_def.push(def as u16);
}
_ => unreachable!(),
}
}
fn parse_one<const WORD_SIZE: u8>(src: &[u8], dst: &mut Vec<u16>) {
match WORD_SIZE {
1 => {
let word = src[0];
dst.push(word as u16);
}
2 => {
let word = u16::from_le_bytes([src[0], src[1]]);
dst.push(word);
}
4 => {
let word = u32::from_le_bytes([src[0], src[1], src[2], src[3]]);
dst.push(word as u16);
}
_ => unreachable!(),
}
}
pub fn bytes_per_word(&self) -> usize {
match self {
Self::BOTH8(..) => 1,
Self::BOTH16(..) => 2,
Self::BOTH32(..) => 4,
Self::REP8 => 1,
Self::REP16 => 2,
Self::REP32 => 4,
Self::DEF8 => 1,
Self::DEF16 => 2,
Self::DEF32 => 4,
Self::NIL => 0,
}
}
pub fn parse(&self, src: &[u8], dst_rep: &mut Vec<u16>, dst_def: &mut Vec<u16>) {
match self {
Self::BOTH8(bits_to_shift, mask_to_apply) => {
Self::parse_both::<1>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
}
Self::BOTH16(bits_to_shift, mask_to_apply) => {
Self::parse_both::<2>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
}
Self::BOTH32(bits_to_shift, mask_to_apply) => {
Self::parse_both::<4>(src, dst_rep, dst_def, *bits_to_shift, *mask_to_apply)
}
Self::REP8 => Self::parse_one::<1>(src, dst_rep),
Self::REP16 => Self::parse_one::<2>(src, dst_rep),
Self::REP32 => Self::parse_one::<4>(src, dst_rep),
Self::DEF8 => Self::parse_one::<1>(src, dst_def),
Self::DEF16 => Self::parse_one::<2>(src, dst_def),
Self::DEF32 => Self::parse_one::<4>(src, dst_def),
Self::NIL => {}
}
}
pub fn new(bits_rep: u8, bits_def: u8) -> Self {
let total_bits = bits_rep + bits_def;
enum WordSize {
One,
Two,
Four,
}
let word_size = if total_bits <= 8 {
WordSize::One
} else if total_bits <= 16 {
WordSize::Two
} else {
WordSize::Four
};
match (bits_rep > 0, bits_def > 0, word_size) {
(false, false, _) => Self::NIL,
(false, true, WordSize::One) => Self::DEF8,
(false, true, WordSize::Two) => Self::DEF16,
(false, true, WordSize::Four) => Self::DEF32,
(true, false, WordSize::One) => Self::REP8,
(true, false, WordSize::Two) => Self::REP16,
(true, false, WordSize::Four) => Self::REP32,
(true, true, WordSize::One) => Self::BOTH8(bits_def, get_mask(bits_def as u16) as u32),
(true, true, WordSize::Two) => Self::BOTH16(bits_def, get_mask(bits_def as u16) as u32),
(true, true, WordSize::Four) => {
Self::BOTH32(bits_def, get_mask(bits_def as u16) as u32)
}
}
}
}
#[cfg(test)]
mod tests {
use arrow_buffer::{NullBuffer, OffsetBuffer, ScalarBuffer};
use crate::repdef::RepDefUnraveler;
use super::RepDefBuilder;
fn validity(values: &[bool]) -> NullBuffer {
NullBuffer::from_iter(values.iter().copied())
}
fn offsets_32(values: &[i32]) -> OffsetBuffer<i32> {
OffsetBuffer::<i32>::new(ScalarBuffer::from_iter(values.iter().copied()))
}
fn offsets_64(values: &[i64]) -> OffsetBuffer<i64> {
OffsetBuffer::<i64>::new(ScalarBuffer::from_iter(values.iter().copied()))
}
#[test]
fn test_repdef() {
let mut builder = RepDefBuilder::default();
builder.add_validity_bitmap(validity(&[true, false, true]));
builder.add_offsets(offsets_64(&[0, 2, 3, 5]));
builder.add_validity_bitmap(validity(&[true, true, true, false, true]));
builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]));
builder.add_validity_bitmap(validity(&[
true, true, true, false, false, false, true, true, false,
]));
let repdefs = RepDefBuilder::serialize(vec![builder]);
let rep = repdefs.repetition_levels.unwrap();
let def = repdefs.definition_levels.unwrap();
assert_eq!(vec![0, 0, 0, 3, 3, 2, 2, 0, 1], def);
assert_eq!(vec![2, 1, 0, 2, 0, 2, 0, 1, 0], rep);
let mut unraveler = RepDefUnraveler::new(Some(rep), Some(def));
assert_eq!(
unraveler.unravel_validity(),
Some(validity(&[
true, true, true, false, false, false, false, true, false
]))
);
assert_eq!(
unraveler.unravel_offsets::<i32>().unwrap().inner(),
offsets_32(&[0, 1, 3, 5, 7, 9]).inner()
);
assert_eq!(
unraveler.unravel_validity(),
Some(validity(&[true, true, false, false, true]))
);
assert_eq!(
unraveler.unravel_offsets::<i32>().unwrap().inner(),
offsets_32(&[0, 2, 3, 5]).inner()
);
assert_eq!(
unraveler.unravel_validity(),
Some(validity(&[true, false, true]))
);
}
#[test]
fn test_repdef_all_valid() {
let mut builder = RepDefBuilder::default();
builder.add_no_null(3);
builder.add_offsets(offsets_64(&[0, 2, 3, 5]));
builder.add_no_null(5);
builder.add_offsets(offsets_64(&[0, 1, 3, 5, 7, 9]));
builder.add_no_null(9);
let repdefs = RepDefBuilder::serialize(vec![builder]);
let rep = repdefs.repetition_levels.unwrap();
assert!(repdefs.definition_levels.is_none());
assert_eq!(vec![2, 1, 0, 2, 0, 2, 0, 1, 0], rep);
let mut unraveler = RepDefUnraveler::new(Some(rep), None);
assert_eq!(unraveler.unravel_validity(), None);
assert_eq!(
unraveler.unravel_offsets::<i32>().unwrap().inner(),
offsets_32(&[0, 1, 3, 5, 7, 9]).inner()
);
assert_eq!(unraveler.unravel_validity(), None);
assert_eq!(
unraveler.unravel_offsets::<i32>().unwrap().inner(),
offsets_32(&[0, 2, 3, 5]).inner()
);
assert_eq!(unraveler.unravel_validity(), None);
}
#[test]
fn test_repdef_no_rep() {
let mut builder = RepDefBuilder::default();
builder.add_no_null(3);
builder.add_validity_bitmap(validity(&[false, false, true, true, true]));
builder.add_validity_bitmap(validity(&[false, true, true, true, false]));
let repdefs = RepDefBuilder::serialize(vec![builder]);
assert!(repdefs.repetition_levels.is_none());
let def = repdefs.definition_levels.unwrap();
assert_eq!(vec![2, 2, 0, 0, 1], def);
let mut unraveler = RepDefUnraveler::new(None, Some(def));
assert_eq!(
unraveler.unravel_validity(),
Some(validity(&[false, false, true, true, false]))
);
assert_eq!(
unraveler.unravel_validity(),
Some(validity(&[false, false, true, true, true]))
);
assert_eq!(unraveler.unravel_validity(), None);
}
#[test]
fn test_repdef_multiple_builders() {
let mut builder1 = RepDefBuilder::default();
builder1.add_validity_bitmap(validity(&[true]));
builder1.add_offsets(offsets_64(&[0, 2]));
builder1.add_validity_bitmap(validity(&[true, true]));
builder1.add_offsets(offsets_64(&[0, 1, 3]));
builder1.add_validity_bitmap(validity(&[true, true, true]));
let mut builder2 = RepDefBuilder::default();
builder2.add_validity_bitmap(validity(&[false, true]));
builder2.add_offsets(offsets_64(&[0, 1, 3]));
builder2.add_validity_bitmap(validity(&[true, false, true]));
builder2.add_offsets(offsets_64(&[0, 2, 4, 6]));
builder2.add_validity_bitmap(validity(&[false, false, false, true, true, false]));
let repdefs = RepDefBuilder::serialize(vec![builder1, builder2]);
let rep = repdefs.repetition_levels.unwrap();
let def = repdefs.definition_levels.unwrap();
assert_eq!(vec![2, 1, 0, 2, 0, 2, 0, 1, 0], rep);
assert_eq!(vec![0, 0, 0, 3, 3, 2, 2, 0, 1], def);
}
#[test]
fn test_control_words() {
fn check(
rep: Vec<u16>,
def: Vec<u16>,
expected_values: Vec<u8>,
expected_bytes_per_word: usize,
expected_bits_rep: u8,
expected_bits_def: u8,
) {
let num_vals = rep.len().max(def.len());
let max_rep = rep.iter().max().copied().unwrap_or(0);
let max_def = def.iter().max().copied().unwrap_or(0);
let in_rep = if rep.is_empty() {
None
} else {
Some(rep.clone())
};
let in_def = if def.is_empty() {
None
} else {
Some(def.clone())
};
let mut iter = super::build_control_word_iterator(in_rep, max_rep, in_def, max_def);
assert_eq!(iter.bytes_per_word(), expected_bytes_per_word);
assert_eq!(iter.bits_rep(), expected_bits_rep);
assert_eq!(iter.bits_def(), expected_bits_def);
let mut cw_vec = Vec::with_capacity(num_vals * iter.bytes_per_word());
for _ in 0..num_vals {
iter.append_next(&mut cw_vec);
}
assert_eq!(expected_values, cw_vec);
let parser = super::ControlWordParser::new(expected_bits_rep, expected_bits_def);
let mut rep_out = Vec::with_capacity(num_vals);
let mut def_out = Vec::with_capacity(num_vals);
if expected_bytes_per_word > 0 {
for slice in cw_vec.chunks_exact(expected_bytes_per_word) {
parser.parse(slice, &mut rep_out, &mut def_out);
}
}
assert_eq!(rep, rep_out);
assert_eq!(def, def_out);
}
let rep = vec![0_u16, 7, 3, 2, 9, 8, 12, 5];
let def = vec![5_u16, 3, 1, 2, 12, 15, 0, 2];
let expected = vec![
0b00000101, 0b01110011, 0b00110001, 0b00100010, 0b10011100, 0b10001111, 0b11000000, 0b01010010, ];
check(rep, def, expected, 1, 4, 4);
let rep = vec![0_u16, 7, 3, 2, 9, 8, 12, 5];
let def = vec![5_u16, 3, 1, 2, 12, 22, 0, 2];
let expected = vec![
0b00000101, 0b00000000, 0b11100011, 0b00000000, 0b01100001, 0b00000000, 0b01000010, 0b00000000, 0b00101100, 0b00000001, 0b00010110, 0b00000001, 0b10000000, 0b00000001, 0b10100010, 0b00000000, ];
check(rep, def, expected, 2, 4, 5);
let levels = vec![0_u16, 7, 3, 2, 9, 8, 12, 5];
let expected = vec![
0b00000000, 0b00000111, 0b00000011, 0b00000010, 0b00001001, 0b00001000, 0b00001100, 0b00000101, ];
check(levels.clone(), Vec::default(), expected.clone(), 1, 4, 0);
check(Vec::default(), levels, expected, 1, 0, 4);
check(Vec::default(), Vec::default(), Vec::default(), 0, 0, 0);
}
}