1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
use super::v2_serializer::{V2SerializeError, V2Serializer};
use super::{Serializer, V2_COMPRESSED_COOKIE};
use crate::core::counter::Counter;
use crate::Histogram;
use byteorder::{BigEndian, WriteBytesExt};
use flate2::write::ZlibEncoder;
use flate2::Compression;
use std::io::{self, Write};
use std::{self, error, fmt};

/// Errors that occur during serialization.
#[derive(Debug)]
pub enum V2DeflateSerializeError {
    /// The underlying serialization failed
    InternalSerializationError(V2SerializeError),
    /// An i/o operation failed.
    IoError(io::Error),
}

impl std::convert::From<std::io::Error> for V2DeflateSerializeError {
    fn from(e: std::io::Error) -> Self {
        V2DeflateSerializeError::IoError(e)
    }
}

impl fmt::Display for V2DeflateSerializeError {
    fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
        match self {
            V2DeflateSerializeError::InternalSerializationError(e) => {
                write!(f, "The underlying serialization failed: {}", e)
            }
            V2DeflateSerializeError::IoError(e) => {
                write!(f, "The underlying serialization failed: {}", e)
            }
        }
    }
}

impl error::Error for V2DeflateSerializeError {
    fn source(&self) -> Option<&(dyn error::Error + 'static)> {
        match self {
            V2DeflateSerializeError::InternalSerializationError(e) => Some(e),
            V2DeflateSerializeError::IoError(e) => Some(e),
        }
    }
}

/// Serializer for the V2 + DEFLATE binary format.
///
/// It's called "deflate" to stay consistent with the naming used in the Java implementation, but
/// it actually uses zlib's wrapper format around plain DEFLATE.
pub struct V2DeflateSerializer {
    uncompressed_buf: Vec<u8>,
    compressed_buf: Vec<u8>,
    v2_serializer: V2Serializer,
}

impl Default for V2DeflateSerializer {
    fn default() -> Self {
        Self::new()
    }
}

impl V2DeflateSerializer {
    /// Create a new serializer.
    pub fn new() -> V2DeflateSerializer {
        V2DeflateSerializer {
            uncompressed_buf: Vec::new(),
            compressed_buf: Vec::new(),
            v2_serializer: V2Serializer::new(),
        }
    }
}

impl Serializer for V2DeflateSerializer {
    type SerializeError = V2DeflateSerializeError;

    fn serialize<T: Counter, W: Write>(
        &mut self,
        h: &Histogram<T>,
        writer: &mut W,
    ) -> Result<usize, V2DeflateSerializeError> {
        // TODO benchmark serializing in chunks rather than all at once: each uncompressed v2 chunk
        // could be compressed and written to the compressed buf, possibly using an approach like
        // that of https://github.com/HdrHistogram/HdrHistogram_rust/issues/32#issuecomment-287583055.
        // This would reduce the overall buffer size needed for plain v2 serialization, and be
        // more cache friendly.

        self.uncompressed_buf.clear();
        self.compressed_buf.clear();
        // TODO serialize directly into uncompressed_buf without the buffering inside v2_serializer
        let uncompressed_len = self
            .v2_serializer
            .serialize(h, &mut self.uncompressed_buf)
            .map_err(V2DeflateSerializeError::InternalSerializationError)?;

        debug_assert_eq!(self.uncompressed_buf.len(), uncompressed_len);
        // On randomized test histograms we get about 10% compression, but of course random data
        // doesn't compress well. Real-world data may compress better, so let's assume a more
        // optimistic 50% compression as a baseline to reserve. If we're overly optimistic that's
        // still only one more allocation the first time it's needed.
        self.compressed_buf.reserve(self.uncompressed_buf.len() / 2);

        self.compressed_buf
            .write_u32::<BigEndian>(V2_COMPRESSED_COOKIE)?;
        // placeholder for length
        self.compressed_buf.write_u32::<BigEndian>(0)?;

        // TODO pluggable compressors? configurable compression levels?
        // TODO benchmark https://github.com/sile/libflate
        // TODO if uncompressed_len is near the limit of 16-bit usize, and compression grows the
        // data instead of shrinking it (which we cannot really predict), writing to compressed_buf
        // could panic as Vec overflows its internal `usize`.

        {
            // TODO reuse deflate buf, or switch to lower-level flate2::Compress
            let mut compressor = ZlibEncoder::new(&mut self.compressed_buf, Compression::default());
            compressor.write_all(&self.uncompressed_buf[0..uncompressed_len])?;
            let _ = compressor.finish()?;
        }

        // fill in length placeholder. Won't underflow since length is always at least 8, and won't
        // overflow u32 as the largest array is about 6 million entries, so about 54MiB encoded (if
        // counter is u64).
        let total_compressed_len = self.compressed_buf.len();
        (&mut self.compressed_buf[4..8])
            .write_u32::<BigEndian>((total_compressed_len as u32) - 8)?;

        writer.write_all(&self.compressed_buf)?;

        Ok(total_compressed_len)
    }
}