use super::{V2_COMPRESSED_COOKIE, V2_COOKIE};
use super::super::{Counter, Histogram, RestatState};
use super::super::num::ToPrimitive;
use std::io::{self, Cursor, ErrorKind, Read};
use std::marker::PhantomData;
use std;
use super::byteorder::{BigEndian, ReadBytesExt};
use super::flate2::read::ZlibDecoder;
#[derive(Debug, PartialEq, Eq, Clone, Copy)]
pub enum DeserializeError {
IoError(ErrorKind),
InvalidCookie,
UnsupportedFeature,
UnsuitableCounterType,
InvalidParameters,
UsizeTypeTooSmall,
EncodedArrayTooLong,
}
impl std::convert::From<std::io::Error> for DeserializeError {
fn from(e: std::io::Error) -> Self {
DeserializeError::IoError(e.kind())
}
}
pub struct Deserializer {
payload_buf: Vec<u8>,
}
impl Default for Deserializer {
fn default() -> Self {
Self::new()
}
}
impl Deserializer {
pub fn new() -> Deserializer {
Deserializer {
payload_buf: Vec::new(),
}
}
pub fn deserialize<T: Counter, R: Read>(
&mut self,
reader: &mut R,
) -> Result<Histogram<T>, DeserializeError> {
let cookie = reader.read_u32::<BigEndian>()?;
match cookie {
V2_COOKIE => self.deser_v2(reader),
V2_COMPRESSED_COOKIE => self.deser_v2_compressed(reader),
_ => Err(DeserializeError::InvalidCookie),
}
}
fn deser_v2_compressed<T: Counter, R: Read>(
&mut self,
reader: &mut R,
) -> Result<Histogram<T>, DeserializeError> {
let payload_len = reader
.read_u32::<BigEndian>()?
.to_usize()
.ok_or(DeserializeError::UsizeTypeTooSmall)?;
let mut deflate_reader = ZlibDecoder::new(reader.take(payload_len as u64));
let inner_cookie = deflate_reader.read_u32::<BigEndian>()?;
if inner_cookie != V2_COOKIE {
return Err(DeserializeError::InvalidCookie);
}
self.deser_v2(&mut deflate_reader)
}
#[cfg_attr(feature = "cargo-clippy", allow(float_cmp))]
fn deser_v2<T: Counter, R: Read>(
&mut self,
reader: &mut R,
) -> Result<Histogram<T>, DeserializeError> {
let payload_len = reader
.read_u32::<BigEndian>()?
.to_usize()
.ok_or(DeserializeError::UsizeTypeTooSmall)?;
let normalizing_offset = reader.read_u32::<BigEndian>()?;
if normalizing_offset != 0 {
return Err(DeserializeError::UnsupportedFeature);
}
let num_digits = reader
.read_u32::<BigEndian>()?
.to_u8()
.ok_or(DeserializeError::InvalidParameters)?;
let low = reader.read_u64::<BigEndian>()?;
let high = reader.read_u64::<BigEndian>()?;
let int_double_ratio = reader.read_f64::<BigEndian>()?;
if int_double_ratio != 1.0 {
return Err(DeserializeError::UnsupportedFeature);
}
let mut h = Histogram::new_with_bounds(low, high, num_digits)
.map_err(|_| DeserializeError::InvalidParameters)?;
if payload_len > self.payload_buf.len() {
self.payload_buf.resize(payload_len, 0);
}
let mut payload_slice = &mut self.payload_buf[0..payload_len];
reader.read_exact(&mut payload_slice)?;
let mut payload_index: usize = 0;
let mut restat_state = RestatState::new();
let mut decode_state = DecodeLoopState::new();
while payload_index < payload_len.saturating_sub(9) {
let (zz_num, bytes_read) =
varint_read_slice(&payload_slice[payload_index..(payload_index + 9)]);
payload_index += bytes_read;
let count_or_zeros = zig_zag_decode(zz_num);
decode_state.on_decoded_num(count_or_zeros, &mut restat_state, &mut h)?;
}
let leftover_slice = &payload_slice[payload_index..];
let mut cursor = Cursor::new(&leftover_slice);
while cursor.position() < leftover_slice.len() as u64 {
let count_or_zeros = zig_zag_decode(varint_read(&mut cursor)?);
decode_state.on_decoded_num(count_or_zeros, &mut restat_state, &mut h)?;
}
restat_state.update_histogram(&mut h);
Ok(h)
}
}
#[inline]
pub fn varint_read_slice(slice: &[u8]) -> (u64, usize) {
let mut b = slice[0];
let mut value: u64 = low_7_bits(b);
if !is_high_bit_set(b) {
return (value, 1);
}
b = slice[1];
value |= low_7_bits(b) << 7;
if !is_high_bit_set(b) {
return (value, 2);
}
b = slice[2];
value |= low_7_bits(b) << (7 * 2);
if !is_high_bit_set(b) {
return (value, 3);
}
b = slice[3];
value |= low_7_bits(b) << (7 * 3);
if !is_high_bit_set(b) {
return (value, 4);
}
b = slice[4];
value |= low_7_bits(b) << (7 * 4);
if !is_high_bit_set(b) {
return (value, 5);
}
b = slice[5];
value |= low_7_bits(b) << (7 * 5);
if !is_high_bit_set(b) {
return (value, 6);
}
b = slice[6];
value |= low_7_bits(b) << (7 * 6);
if !is_high_bit_set(b) {
return (value, 7);
}
b = slice[7];
value |= low_7_bits(b) << (7 * 7);
if !is_high_bit_set(b) {
return (value, 8);
}
b = slice[8];
value |= u64::from(b) << (7 * 8);
(value, 9)
}
pub fn varint_read<R: Read>(reader: &mut R) -> io::Result<u64> {
let mut b = reader.read_u8()?;
let mut value: u64 = low_7_bits(b);
if is_high_bit_set(b) {
b = reader.read_u8()?;
value |= low_7_bits(b) << 7;
if is_high_bit_set(b) {
b = reader.read_u8()?;
value |= low_7_bits(b) << (7 * 2);
if is_high_bit_set(b) {
b = reader.read_u8()?;
value |= low_7_bits(b) << (7 * 3);
if is_high_bit_set(b) {
b = reader.read_u8()?;
value |= low_7_bits(b) << (7 * 4);
if is_high_bit_set(b) {
b = reader.read_u8()?;
value |= low_7_bits(b) << (7 * 5);
if is_high_bit_set(b) {
b = reader.read_u8()?;
value |= low_7_bits(b) << (7 * 6);
if is_high_bit_set(b) {
b = reader.read_u8()?;
value |= low_7_bits(b) << (7 * 7);
if is_high_bit_set(b) {
b = reader.read_u8()?;
value |= u64::from(b) << (7 * 8);
}
}
}
}
}
}
}
}
Ok(value)
}
#[inline]
fn low_7_bits(b: u8) -> u64 {
u64::from(b & 0x7F)
}
#[inline]
fn is_high_bit_set(b: u8) -> bool {
(b & 0x80) != 0
}
#[inline]
pub fn zig_zag_decode(encoded: u64) -> i64 {
((encoded >> 1) as i64) ^ -((encoded & 1) as i64)
}
struct DecodeLoopState<T: Counter> {
dest_index: usize,
phantom: PhantomData<T>,
}
impl<T: Counter> DecodeLoopState<T> {
fn new() -> DecodeLoopState<T> {
DecodeLoopState {
dest_index: 0,
phantom: PhantomData,
}
}
#[inline]
fn on_decoded_num(
&mut self,
count_or_zeros: i64,
restat_state: &mut RestatState<T>,
h: &mut Histogram<T>,
) -> Result<(), DeserializeError> {
if count_or_zeros < 0 {
let zero_count = (-count_or_zeros)
.to_usize()
.ok_or(DeserializeError::UsizeTypeTooSmall)?;
self.dest_index = self.dest_index
.checked_add(zero_count)
.ok_or(DeserializeError::UsizeTypeTooSmall)?;
} else {
let count: T =
T::from_i64(count_or_zeros).ok_or(DeserializeError::UnsuitableCounterType)?;
if count > T::zero() {
h.set_count_at_index(self.dest_index, count)
.map_err(|_| DeserializeError::EncodedArrayTooLong)?;
restat_state.on_nonzero_count(self.dest_index, count);
}
self.dest_index = self.dest_index
.checked_add(1)
.ok_or(DeserializeError::UsizeTypeTooSmall)?;
}
Ok(())
}
}