use super::{Serializer, V2_COOKIE, V2_HEADER_SIZE};
use crate::{Counter, Histogram};
use byteorder::{BigEndian, WriteBytesExt};
use std::io::{self, Write};
use std::{error, fmt};
#[derive(Debug)]
pub enum V2SerializeError {
CountNotSerializable,
UsizeTypeTooSmall,
IoError(io::Error),
}
impl std::convert::From<std::io::Error> for V2SerializeError {
fn from(e: std::io::Error) -> Self {
V2SerializeError::IoError(e)
}
}
impl fmt::Display for V2SerializeError {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
match self {
V2SerializeError::CountNotSerializable => write!(
f,
"A count above i64::max_value() cannot be zig-zag encoded"
),
V2SerializeError::UsizeTypeTooSmall => {
write!(f, "Internal calculations cannot be represented in `usize`")
}
V2SerializeError::IoError(e) => write!(f, "An i/o operation failed: {}", e),
}
}
}
impl error::Error for V2SerializeError {
fn source(&self) -> Option<&(dyn error::Error + 'static)> {
match self {
V2SerializeError::IoError(e) => Some(e),
_ => None,
}
}
}
pub struct V2Serializer {
buf: Vec<u8>,
}
impl Default for V2Serializer {
fn default() -> Self {
Self::new()
}
}
impl V2Serializer {
pub fn new() -> V2Serializer {
V2Serializer { buf: Vec::new() }
}
}
impl Serializer for V2Serializer {
type SerializeError = V2SerializeError;
fn serialize<T: Counter, W: Write>(
&mut self,
h: &Histogram<T>,
writer: &mut W,
) -> Result<usize, V2SerializeError> {
self.buf.clear();
let max_size = max_encoded_size(h).ok_or(V2SerializeError::UsizeTypeTooSmall)?;
self.buf.reserve(max_size);
self.buf.write_u32::<BigEndian>(V2_COOKIE)?;
self.buf.write_u32::<BigEndian>(0)?;
self.buf.write_u32::<BigEndian>(0)?;
self.buf
.write_u32::<BigEndian>(u32::from(h.significant_value_digits))?;
self.buf
.write_u64::<BigEndian>(h.lowest_discernible_value)?;
self.buf.write_u64::<BigEndian>(h.highest_trackable_value)?;
self.buf.write_f64::<BigEndian>(1.0)?;
debug_assert_eq!(V2_HEADER_SIZE, self.buf.len());
unsafe {
self.buf.set_len(max_size);
}
let counts_len = encode_counts(h, &mut self.buf[V2_HEADER_SIZE..])?;
let total_len = V2_HEADER_SIZE + counts_len;
(&mut self.buf[4..8]).write_u32::<BigEndian>(counts_len as u32)?;
writer
.write_all(&self.buf[0..(total_len)])
.map(|_| total_len)
.map_err(V2SerializeError::IoError)
}
}
fn max_encoded_size<T: Counter>(h: &Histogram<T>) -> Option<usize> {
h.index_for(h.max())
.and_then(|i| counts_array_max_encoded_size(i + 1))
.and_then(|x| x.checked_add(V2_HEADER_SIZE))
}
pub fn counts_array_max_encoded_size(length: usize) -> Option<usize> {
length.checked_mul(9)
}
pub fn encode_counts<T: Counter>(
h: &Histogram<T>,
buf: &mut [u8],
) -> Result<usize, V2SerializeError> {
let index_limit = h
.index_for(h.max())
.expect("Index for max value must exist");
let mut index = 0;
let mut bytes_written = 0;
assert!(index_limit <= h.counts.len());
while index <= index_limit {
let count = unsafe { *(h.counts.get_unchecked(index)) };
index += 1;
let mut zero_count = 0;
if count == T::zero() {
zero_count = 1;
while (index <= index_limit)
&& (unsafe { *(h.counts.get_unchecked(index)) } == T::zero())
{
zero_count += 1;
index += 1;
}
}
let count_or_zeros: i64 = if zero_count > 1 {
-zero_count
} else {
count
.to_i64()
.ok_or(V2SerializeError::CountNotSerializable)?
};
let zz = zig_zag_encode(count_or_zeros);
bytes_written += varint_write(zz, &mut buf[bytes_written..]);
}
Ok(bytes_written)
}
#[inline]
pub fn varint_write(input: u64, buf: &mut [u8]) -> usize {
if shift_by_7s(input, 1) == 0 {
buf[0] = input as u8;
return 1;
}
buf[0] = 0x80 | ((input & 0x7F) as u8);
if shift_by_7s(input, 2) == 0 {
buf[1] = shift_by_7s(input, 1) as u8;
return 2;
}
buf[1] = nth_7b_chunk_with_high_bit(input, 1);
if shift_by_7s(input, 3) == 0 {
buf[2] = shift_by_7s(input, 2) as u8;
return 3;
}
buf[2] = nth_7b_chunk_with_high_bit(input, 2);
if shift_by_7s(input, 4) == 0 {
buf[3] = shift_by_7s(input, 3) as u8;
return 4;
}
buf[3] = nth_7b_chunk_with_high_bit(input, 3);
if shift_by_7s(input, 5) == 0 {
buf[4] = shift_by_7s(input, 4) as u8;
return 5;
}
buf[4] = nth_7b_chunk_with_high_bit(input, 4);
if shift_by_7s(input, 6) == 0 {
buf[5] = shift_by_7s(input, 5) as u8;
return 6;
}
buf[5] = nth_7b_chunk_with_high_bit(input, 5);
if shift_by_7s(input, 7) == 0 {
buf[6] = shift_by_7s(input, 6) as u8;
return 7;
}
buf[6] = nth_7b_chunk_with_high_bit(input, 6);
if shift_by_7s(input, 8) == 0 {
buf[7] = shift_by_7s(input, 7) as u8;
return 8;
}
buf[7] = nth_7b_chunk_with_high_bit(input, 7);
buf[8] = (input >> 56) as u8;
9
}
#[inline]
fn shift_by_7s(input: u64, n: u8) -> u64 {
input >> (7 * n)
}
#[inline]
fn nth_7b_chunk_with_high_bit(input: u64, n: u8) -> u8 {
(shift_by_7s(input, n) as u8) | 0x80
}
#[inline]
pub fn zig_zag_encode(num: i64) -> u64 {
((num << 1) ^ (num >> 63)) as u64
}