use std::borrow::Borrow;
use bytes::{Buf, BufMut};
use prost::DecodeError;
use risinglight_proto::rowset::BlockStatistics;
use super::BlockBuilder;
use crate::array::Array;
pub fn encode_32<B>(mut value: u32, buf: &mut B)
where
B: BufMut,
{
loop {
if value < 0x80 {
buf.put_u8(value as u8);
break;
} else {
buf.put_u8(((value & 0x7F) | 0x80) as u8);
value >>= 7;
}
}
}
pub fn decode_u32<B>(buf: &mut B) -> Result<Vec<u32>, DecodeError>
where
B: Buf,
{
let mut bytes = buf.chunk();
let len = bytes.len();
if len == 0 {
return Err(DecodeError::new("invalid varint"));
}
let mut ret: Vec<u32> = Vec::new();
let mut tot = 0;
while tot < len {
let (value, advance) = decode_u32_slice(bytes)?;
ret.push(value);
bytes.advance(advance);
tot += advance;
}
Ok(ret)
}
#[inline]
pub fn decode_u32_slice(bytes: &[u8]) -> Result<(u32, usize), DecodeError> {
assert!(!bytes.is_empty());
let mut b: u8 = unsafe { *bytes.get_unchecked(0) };
let mut part0: u32 = u32::from(b);
if b < 0x80 {
return Ok((part0, 1));
};
part0 -= 0x80;
b = unsafe { *bytes.get_unchecked(1) };
part0 += u32::from(b) << 7;
if b < 0x80 {
return Ok((part0, 2));
};
part0 -= 0x80 << 7;
b = unsafe { *bytes.get_unchecked(2) };
part0 += u32::from(b) << 14;
if b < 0x80 {
return Ok((part0, 3));
};
part0 -= 0x80 << 14;
b = unsafe { *bytes.get_unchecked(3) };
part0 += u32::from(b) << 21;
if b < 0x80 {
return Ok((part0, 4));
};
part0 -= 0x80 << 21;
b = unsafe { *bytes.get_unchecked(4) };
if b < 0x0f {
return Ok((part0 + (u32::from(b) << 28), 5));
};
Err(DecodeError::new("invalid varint"))
}
pub struct RleBlockBuilder<A, B>
where
A: Array,
B: BlockBuilder<A>,
A::Item: PartialEq,
{
block_builder: B,
rle_counts: Vec<u32>,
previous_value: Option<<A::Item as ToOwned>::Owned>,
cur_count: u32,
}
impl<A, B> RleBlockBuilder<A, B>
where
A: Array,
B: BlockBuilder<A>,
A::Item: PartialEq,
{
pub fn new(block_builder: B) -> Self {
Self {
block_builder,
rle_counts: Vec::new(),
previous_value: None,
cur_count: 0,
}
}
}
impl<A, B> BlockBuilder<A> for RleBlockBuilder<A, B>
where
A: Array,
B: BlockBuilder<A>,
A::Item: PartialEq,
{
fn append(&mut self, item: Option<&A::Item>) {
if self.cur_count == 0 {
self.previous_value = item.map(|x| x.to_owned());
self.block_builder.append(item);
self.cur_count = 1;
return;
}
if item != self.previous_value.as_ref().map(|x| x.borrow()) || self.cur_count == u32::MAX {
self.previous_value = item.map(|x| x.to_owned());
self.block_builder.append(item);
self.rle_counts.push(self.cur_count);
self.cur_count = 1;
} else {
self.cur_count += 1;
}
}
fn estimated_size(&self) -> usize {
self.block_builder.estimated_size()
+ self.rle_counts.len() * std::mem::size_of::<u16>()
+ std::mem::size_of::<u32>()
+ (self.cur_count != 0) as usize * std::mem::size_of::<u16>()
}
fn should_finish(&self, next_item: &Option<&A::Item>) -> bool {
self.block_builder.should_finish(next_item)
}
fn get_statistics(&self) -> Vec<BlockStatistics> {
self.block_builder.get_statistics()
}
fn finish(mut self) -> Vec<u8> {
let mut encoded_data: Vec<u8> = vec![];
if self.cur_count == 0 {
return encoded_data;
}
self.rle_counts.push(self.cur_count);
encoded_data.put_u32_le(self.rle_counts.len() as u32);
let offset = encoded_data.len();
encoded_data.put_u32_le(0xffffffff); for count in self.rle_counts {
encode_32(count, &mut encoded_data);
}
let len = encoded_data.len();
let mut t = &mut encoded_data[offset..(offset + 4)];
t.put_u32_le((len - offset - 4) as u32); let data = self.block_builder.finish();
encoded_data.extend(data);
encoded_data
}
fn get_target_size(&self) -> usize {
self.block_builder.get_target_size()
}
}
#[cfg(test)]
mod tests {
use itertools::Itertools;
use super::super::{
NullableBlockBuilder, PlainBlobBlockBuilder, PlainCharBlockBuilder,
PlainPrimitiveBlockBuilder,
};
use super::*;
use crate::array::{I32Array, Utf8Array};
#[test]
fn test_build_rle_primitive_i32() {
let builder = PlainPrimitiveBlockBuilder::new(14);
let mut rle_builder =
RleBlockBuilder::<I32Array, PlainPrimitiveBlockBuilder<i32>>::new(builder);
for item in [Some(&1)].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [Some(&2)].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [Some(&3)].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
assert_eq!(rle_builder.estimated_size(), 4 * 3 + 2 * 3 + 4);
assert!(rle_builder.should_finish(&Some(&3)));
assert!(rle_builder.should_finish(&Some(&4)));
rle_builder.finish();
}
#[test]
fn test_build_rle_primitive_nullable_i32() {
let inner_builder = PlainPrimitiveBlockBuilder::<i32>::new(48);
let builder = NullableBlockBuilder::new(inner_builder, 48);
let mut rle_builder = RleBlockBuilder::new(builder);
for item in [None].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [Some(&1)].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [None].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [Some(&2)].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [None].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [Some(&3)].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [None].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [Some(&4)].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [None].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [Some(&5)]
.iter()
.cycle()
.cloned()
.take(u16::MAX as usize + 1)
{
rle_builder.append(item);
}
rle_builder.finish();
}
#[test]
fn test_build_rle_char() {
let builder = PlainCharBlockBuilder::new(120, 40);
let mut rle_builder = RleBlockBuilder::<Utf8Array, PlainCharBlockBuilder>::new(builder);
let width_40_char = ["2"].iter().cycle().take(40).join("");
for item in [Some("233")].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [Some("2333")].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [Some(&width_40_char[..])].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
assert_eq!(rle_builder.estimated_size(), 40 * 3 + 2 * 3 + 4);
assert!(rle_builder.should_finish(&Some(&width_40_char[..])));
assert!(rle_builder.should_finish(&Some("2333333")));
rle_builder.finish();
}
#[test]
fn test_build_rle_varchar() {
let builder = PlainBlobBlockBuilder::new(30);
let mut rle_builder =
RleBlockBuilder::<Utf8Array, PlainBlobBlockBuilder<str>>::new(builder);
for item in [Some("233")].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [Some("23333")].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
for item in [Some("2333333")].iter().cycle().cloned().take(30) {
rle_builder.append(item);
}
assert_eq!(rle_builder.estimated_size(), 15 + 4 * 3 + 2 * 3 + 4); assert!(rle_builder.should_finish(&Some("2333333")));
assert!(rle_builder.should_finish(&Some("23333333")));
rle_builder.finish();
}
}