use std::mem;
use super::rle::RleEncoder;
use crate::data_type::AsBytes;
use crate::util::bit_util::num_required_bits;
pub enum LevelEncoder {
Rle(RleEncoder),
RleV2(RleEncoder),
}
impl LevelEncoder {
pub fn v1_streaming(max_level: i16) -> Self {
let bit_width = num_required_bits(max_level as u64);
let buffer = vec![0u8; 4];
LevelEncoder::Rle(RleEncoder::new_from_buf(bit_width, buffer))
}
pub fn v2_streaming(max_level: i16) -> Self {
let bit_width = num_required_bits(max_level as u64);
LevelEncoder::RleV2(RleEncoder::new_from_buf(bit_width, Vec::new()))
}
#[inline]
pub fn put_with_observer<F>(&mut self, buffer: &[i16], mut observer: F) -> usize
where
F: FnMut(i16, usize),
{
match *self {
LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut encoder) => {
let mut remaining = buffer;
while let Some((&value, rest)) = remaining.split_first() {
encoder.put(value as u64);
if encoder.is_accumulating_rle(value as u64) {
let run_len = rest.iter().take_while(|&&v| v == value).count();
if run_len > 0 {
encoder.extend_run(run_len);
}
observer(value, 1 + run_len);
remaining = &rest[run_len..];
} else {
observer(value, 1);
remaining = rest;
}
}
buffer.len()
}
}
}
#[inline]
pub fn put_n_with_observer<F>(&mut self, value: i16, count: usize, mut observer: F)
where
F: FnMut(i16, usize),
{
let encoder = match *self {
LevelEncoder::Rle(ref mut encoder) | LevelEncoder::RleV2(ref mut encoder) => encoder,
};
let mut remaining = count;
while remaining > 0 && !encoder.is_accumulating_rle(value as u64) {
encoder.put(value as u64);
remaining -= 1;
}
if remaining > 0 {
encoder.extend_run(remaining);
}
observer(value, count);
}
#[inline]
#[allow(unused)]
pub fn consume(self) -> Vec<u8> {
match self {
LevelEncoder::Rle(encoder) => {
let mut encoded_data = encoder.consume();
let encoded_len = encoded_data.len() - mem::size_of::<i32>();
let len = (encoded_len as i32).to_le();
let len_bytes = len.as_bytes();
encoded_data[0..len_bytes.len()].copy_from_slice(len_bytes);
encoded_data
}
LevelEncoder::RleV2(encoder) => encoder.consume(),
}
}
#[inline]
pub fn flush_to<F, R>(&mut self, f: F) -> R
where
F: FnOnce(&[u8]) -> R,
{
let result = match self {
LevelEncoder::Rle(encoder) => {
let data = encoder.flush_buffer_mut();
let encoded_len = (data.len() - mem::size_of::<i32>()) as i32;
data[..4].copy_from_slice(&encoded_len.to_le_bytes());
f(data)
}
LevelEncoder::RleV2(encoder) => f(encoder.flush_buffer()),
};
match self {
LevelEncoder::Rle(encoder) => {
encoder.clear();
encoder.skip(mem::size_of::<i32>());
}
LevelEncoder::RleV2(encoder) => encoder.clear(),
}
result
}
}
#[cfg(test)]
mod tests {
use super::*;
fn reference_encode(max_level: i16, values: &[i16]) -> Vec<u8> {
let mut enc = LevelEncoder::v2_streaming(max_level);
enc.put_with_observer(values, |_, _| {});
enc.consume()
}
#[test]
fn test_put_n_with_observer_large_run() {
let max_level = 3;
let count = 10_000;
let mut enc = LevelEncoder::v2_streaming(max_level);
enc.put_n_with_observer(2, count, |_, _| {});
assert_eq!(enc.consume(), reference_encode(max_level, &vec![2; count]));
}
#[test]
fn test_put_n_with_observer_small_count() {
let max_level = 3;
let mut enc = LevelEncoder::v2_streaming(max_level);
enc.put_n_with_observer(1, 5, |_, _| {});
assert_eq!(enc.consume(), reference_encode(max_level, &[1; 5]));
}
#[test]
fn test_put_n_with_observer_exact_threshold() {
let max_level = 3;
let mut enc = LevelEncoder::v2_streaming(max_level);
enc.put_n_with_observer(3, 8, |_, _| {});
assert_eq!(enc.consume(), reference_encode(max_level, &[3; 8]));
}
#[test]
fn test_put_n_with_observer_single_value() {
let max_level = 1;
let mut enc = LevelEncoder::v2_streaming(max_level);
enc.put_n_with_observer(1, 1, |_, _| {});
assert_eq!(enc.consume(), reference_encode(max_level, &[1]));
}
#[test]
fn test_put_n_with_observer_zero_count() {
let max_level = 3;
let mut enc = LevelEncoder::v2_streaming(max_level);
enc.put_n_with_observer(2, 0, |_, _| {});
assert_eq!(enc.consume(), reference_encode(max_level, &[]));
}
#[test]
fn test_put_n_with_observer_calls_observer_exactly_once() {
let mut enc = LevelEncoder::v2_streaming(3);
let mut calls: Vec<(i16, usize)> = Vec::new();
enc.put_n_with_observer(2, 500, |val, cnt| calls.push((val, cnt)));
assert_eq!(calls, vec![(2, 500)]);
}
#[test]
fn test_put_n_with_observer_zero_count_calls_observer() {
let mut enc = LevelEncoder::v2_streaming(3);
let mut calls: Vec<(i16, usize)> = Vec::new();
enc.put_n_with_observer(1, 0, |val, cnt| calls.push((val, cnt)));
assert_eq!(calls, vec![(1, 0)]);
}
#[test]
fn test_put_n_with_observer_followed_by_different_value() {
let max_level = 3;
let mut enc = LevelEncoder::v2_streaming(max_level);
let mut calls: Vec<(i16, usize)> = Vec::new();
enc.put_n_with_observer(1, 100, |v, c| calls.push((v, c)));
enc.put_n_with_observer(3, 200, |v, c| calls.push((v, c)));
assert_eq!(calls, vec![(1, 100), (3, 200)]);
let reference = reference_encode(max_level, &[&[1i16; 100][..], &[3i16; 200]].concat());
assert_eq!(enc.consume(), reference);
}
#[test]
fn test_put_n_with_observer_interleaved_with_put_with_observer() {
let max_level = 3;
let mut enc = LevelEncoder::v2_streaming(max_level);
enc.put_n_with_observer(2, 50, |_, _| {});
enc.put_with_observer(&[0, 0, 1, 1, 3], |_, _| {});
enc.put_n_with_observer(2, 50, |_, _| {});
let input = [&[2i16; 50][..], &[0, 0, 1, 1, 3], &[2i16; 50]].concat();
assert_eq!(enc.consume(), reference_encode(max_level, &input));
}
#[test]
fn test_put_n_with_observer_v1_roundtrip() {
let max_level = 3;
let mut enc = LevelEncoder::v1_streaming(max_level);
enc.put_n_with_observer(2, 1000, |_, _| {});
let mut ref_enc = LevelEncoder::v1_streaming(max_level);
ref_enc.put_with_observer(&[2; 1000], |_, _| {});
assert_eq!(enc.consume(), ref_enc.consume());
}
}