use super::Encode;
use super::COMPRESSION_ERROR;
use brotli::{CompressorWriter, DecompressorWriter};
use bytes::Bytes;
use pingora_error::{OrErr, Result};
use std::io::Write;
use std::time::{Duration, Instant};
pub struct Decompressor {
decompress: DecompressorWriter<Vec<u8>>,
total_in: usize,
total_out: usize,
duration: Duration,
}
impl Decompressor {
pub fn new() -> Self {
Decompressor {
decompress: DecompressorWriter::new(vec![], 0),
total_in: 0,
total_out: 0,
duration: Duration::new(0, 0),
}
}
}
impl Encode for Decompressor {
fn encode(&mut self, input: &[u8], end: bool) -> Result<Bytes> {
const MAX_INIT_COMPRESSED_SIZE_CAP: usize = 4 * 1024;
const ESTIMATED_COMPRESSION_RATIO: usize = 4;
let start = Instant::now();
self.total_in += input.len();
let reserve_size = if input.len() < MAX_INIT_COMPRESSED_SIZE_CAP {
input.len() * ESTIMATED_COMPRESSION_RATIO
} else {
input.len()
};
self.decompress.get_mut().reserve(reserve_size);
self.decompress
.write_all(input)
.or_err(COMPRESSION_ERROR, "while decompress Brotli")?;
if end {
self.decompress
.flush()
.or_err(COMPRESSION_ERROR, "while decompress Brotli")?;
}
self.total_out += self.decompress.get_ref().len();
self.duration += start.elapsed();
Ok(std::mem::take(self.decompress.get_mut()).into()) }
fn stat(&self) -> (&'static str, usize, usize, Duration) {
("de-brotli", self.total_in, self.total_out, self.duration)
}
}
pub struct Compressor {
compress: CompressorWriter<Vec<u8>>,
total_in: usize,
total_out: usize,
duration: Duration,
}
impl Compressor {
pub fn new(level: u32) -> Self {
Compressor {
compress: CompressorWriter::new(vec![], 4096, level, 19),
total_in: 0,
total_out: 0,
duration: Duration::new(0, 0),
}
}
}
impl Encode for Compressor {
fn encode(&mut self, input: &[u8], end: bool) -> Result<Bytes> {
const MAX_INIT_COMPRESSED_BUF_SIZE: usize = 16 * 1024;
let start = Instant::now();
self.total_in += input.len();
self.compress
.get_mut()
.reserve(std::cmp::min(MAX_INIT_COMPRESSED_BUF_SIZE, input.len()));
self.compress
.write_all(input)
.or_err(COMPRESSION_ERROR, "while compress Brotli")?;
if end {
self.compress
.flush()
.or_err(COMPRESSION_ERROR, "while compress Brotli")?;
}
self.total_out += self.compress.get_ref().len();
self.duration += start.elapsed();
Ok(std::mem::take(self.compress.get_mut()).into()) }
fn stat(&self) -> (&'static str, usize, usize, Duration) {
("brotli", self.total_in, self.total_out, self.duration)
}
}
#[cfg(test)]
mod tests_stream {
use super::*;
#[test]
fn decompress_brotli_data() {
let mut compressor = Decompressor::new();
let decompressed = compressor
.encode(
&[
0x1f, 0x0f, 0x00, 0xf8, 0x45, 0x07, 0x87, 0x3e, 0x10, 0xfb, 0x55, 0x92, 0xec,
0x12, 0x09, 0xcc, 0x38, 0xdd, 0x51, 0x1e,
],
true,
)
.unwrap();
assert_eq!(&decompressed[..], &b"adcdefgabcdefgh\n"[..]);
}
#[test]
fn compress_brotli_data() {
let mut compressor = Compressor::new(11);
let compressed = compressor.encode(&b"adcdefgabcdefgh\n"[..], true).unwrap();
assert_eq!(
&compressed[..],
&[
0x85, 0x07, 0x00, 0xf8, 0x45, 0x07, 0x87, 0x3e, 0x10, 0xfb, 0x55, 0x92, 0xec, 0x12,
0x09, 0xcc, 0x38, 0xdd, 0x51, 0x1e,
],
);
}
}