use crate::{CompressionAlgorithm, CompressionError, Result};
use bytes::{Bytes, BytesMut};
#[cfg(feature = "gzip")]
use flate2::Compression as GzipCompression;
#[cfg(feature = "gzip")]
use flate2::write::GzEncoder;
#[cfg(feature = "brotli")]
use brotli::CompressorWriter as BrotliEncoder;
#[cfg(feature = "zstd")]
use zstd::stream::write::Encoder as ZstdEncoder;
use std::io::Write;
#[derive(Debug, Clone)]
pub struct StreamingConfig {
pub algorithm: CompressionAlgorithm,
pub level: u32,
pub flush_interval: usize,
pub min_chunk_size: usize,
pub buffer_size: usize,
}
impl Default for StreamingConfig {
fn default() -> Self {
Self {
algorithm: CompressionAlgorithm::Auto,
level: 6,
flush_interval: 4096,
min_chunk_size: 64,
buffer_size: 8192,
}
}
}
impl StreamingConfig {
pub fn new() -> Self {
Self::default()
}
pub fn algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
self.algorithm = algorithm;
self
}
pub fn level(mut self, level: u32) -> Self {
self.level = level;
self
}
pub fn flush_interval(mut self, interval: usize) -> Self {
self.flush_interval = interval;
self
}
pub fn min_chunk_size(mut self, size: usize) -> Self {
self.min_chunk_size = size;
self
}
pub fn buffer_size(mut self, size: usize) -> Self {
self.buffer_size = size;
self
}
pub fn low_latency() -> Self {
Self {
algorithm: CompressionAlgorithm::Auto,
level: 1, flush_interval: 256, min_chunk_size: 16,
buffer_size: 1024,
}
}
pub fn high_compression() -> Self {
Self {
algorithm: CompressionAlgorithm::Auto,
level: 9, flush_interval: 16384, min_chunk_size: 1024,
buffer_size: 32768,
}
}
}
#[allow(clippy::large_enum_variant)] enum EncoderState {
None,
#[cfg(feature = "gzip")]
Gzip(GzEncoder<Vec<u8>>),
#[cfg(feature = "brotli")]
Brotli(BrotliEncoder<Vec<u8>>),
#[cfg(feature = "zstd")]
Zstd(ZstdEncoder<'static, Vec<u8>>),
}
pub struct StreamingCompressor {
config: StreamingConfig,
encoder: EncoderState,
bytes_in: u64,
bytes_out: u64,
unflushed_bytes: usize,
finished: bool,
}
impl StreamingCompressor {
pub fn new(config: StreamingConfig) -> Result<Self> {
let encoder = Self::create_encoder(&config)?;
Ok(Self {
config,
encoder,
bytes_in: 0,
bytes_out: 0,
unflushed_bytes: 0,
finished: false,
})
}
pub fn from_accept_encoding(accept_encoding: &str, config: StreamingConfig) -> Result<Self> {
let algorithm = CompressionAlgorithm::select_from_accept_encoding(accept_encoding);
let config = StreamingConfig {
algorithm,
..config
};
Self::new(config)
}
fn create_encoder(config: &StreamingConfig) -> Result<EncoderState> {
let algorithm = match config.algorithm {
CompressionAlgorithm::Auto => {
#[cfg(feature = "gzip")]
{
CompressionAlgorithm::Gzip
}
#[cfg(not(feature = "gzip"))]
{
CompressionAlgorithm::None
}
}
other => other,
};
match algorithm {
CompressionAlgorithm::None | CompressionAlgorithm::Auto => Ok(EncoderState::None),
#[cfg(feature = "gzip")]
CompressionAlgorithm::Gzip => {
let level = config.level.clamp(1, 9);
let encoder = GzEncoder::new(
Vec::with_capacity(config.buffer_size),
GzipCompression::new(level),
);
Ok(EncoderState::Gzip(encoder))
}
#[cfg(feature = "brotli")]
CompressionAlgorithm::Brotli => {
let level = config.level.clamp(0, 11);
let encoder = BrotliEncoder::new(
Vec::with_capacity(config.buffer_size),
config.buffer_size,
level,
22, );
Ok(EncoderState::Brotli(encoder))
}
#[cfg(feature = "zstd")]
CompressionAlgorithm::Zstd => {
let level = config.level.clamp(1, 22) as i32;
let encoder = ZstdEncoder::new(Vec::with_capacity(config.buffer_size), level)
.map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
Ok(EncoderState::Zstd(encoder))
}
#[allow(unreachable_patterns)]
_ => Err(CompressionError::UnsupportedAlgorithm(format!(
"{:?} not available",
algorithm
))),
}
}
pub fn encoding(&self) -> Option<&'static str> {
match &self.encoder {
EncoderState::None => None,
#[cfg(feature = "gzip")]
EncoderState::Gzip(_) => Some("gzip"),
#[cfg(feature = "brotli")]
EncoderState::Brotli(_) => Some("br"),
#[cfg(feature = "zstd")]
EncoderState::Zstd(_) => Some("zstd"),
}
}
pub fn compress_chunk(&mut self, data: &[u8]) -> Result<Bytes> {
if self.finished {
return Err(CompressionError::CompressionFailed(
"Compressor already finished".to_string(),
));
}
if data.is_empty() {
return Ok(Bytes::new());
}
self.bytes_in += data.len() as u64;
self.unflushed_bytes += data.len();
match &mut self.encoder {
EncoderState::None => {
self.bytes_out += data.len() as u64;
Ok(Bytes::copy_from_slice(data))
}
#[cfg(feature = "gzip")]
EncoderState::Gzip(encoder) => {
encoder
.write_all(data)
.map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
if self.unflushed_bytes >= self.config.flush_interval {
self.flush_internal()
} else {
Ok(Bytes::new())
}
}
#[cfg(feature = "brotli")]
EncoderState::Brotli(encoder) => {
encoder
.write_all(data)
.map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
if self.unflushed_bytes >= self.config.flush_interval {
self.flush_internal()
} else {
Ok(Bytes::new())
}
}
#[cfg(feature = "zstd")]
EncoderState::Zstd(encoder) => {
encoder
.write_all(data)
.map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
if self.unflushed_bytes >= self.config.flush_interval {
self.flush_internal()
} else {
Ok(Bytes::new())
}
}
}
}
pub fn flush(&mut self) -> Result<Bytes> {
self.flush_internal()
}
fn flush_internal(&mut self) -> Result<Bytes> {
self.unflushed_bytes = 0;
match &mut self.encoder {
EncoderState::None => Ok(Bytes::new()),
#[cfg(feature = "gzip")]
EncoderState::Gzip(encoder) => {
encoder
.flush()
.map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
let inner = encoder.get_mut();
if inner.is_empty() {
return Ok(Bytes::new());
}
let output = std::mem::take(inner);
self.bytes_out += output.len() as u64;
Ok(Bytes::from(output))
}
#[cfg(feature = "brotli")]
EncoderState::Brotli(encoder) => {
encoder
.flush()
.map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
let inner = encoder.get_mut();
if inner.is_empty() {
return Ok(Bytes::new());
}
let output = std::mem::take(inner);
self.bytes_out += output.len() as u64;
Ok(Bytes::from(output))
}
#[cfg(feature = "zstd")]
EncoderState::Zstd(encoder) => {
encoder
.flush()
.map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
let inner = encoder.get_mut();
if inner.is_empty() {
return Ok(Bytes::new());
}
let output = std::mem::take(inner);
self.bytes_out += output.len() as u64;
Ok(Bytes::from(output))
}
}
}
pub fn finish(mut self) -> Result<Bytes> {
if self.finished {
return Ok(Bytes::new());
}
self.finished = true;
match self.encoder {
EncoderState::None => Ok(Bytes::new()),
#[cfg(feature = "gzip")]
EncoderState::Gzip(encoder) => {
let output = encoder
.finish()
.map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
self.bytes_out += output.len() as u64;
Ok(Bytes::from(output))
}
#[cfg(feature = "brotli")]
EncoderState::Brotli(mut encoder) => {
encoder
.flush()
.map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
let output = encoder.into_inner();
self.bytes_out += output.len() as u64;
Ok(Bytes::from(output))
}
#[cfg(feature = "zstd")]
EncoderState::Zstd(encoder) => {
let output = encoder
.finish()
.map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
self.bytes_out += output.len() as u64;
Ok(Bytes::from(output))
}
}
}
pub fn stats(&self) -> CompressionStats {
CompressionStats {
bytes_in: self.bytes_in,
bytes_out: self.bytes_out,
ratio: if self.bytes_in > 0 {
self.bytes_out as f64 / self.bytes_in as f64
} else {
1.0
},
}
}
}
#[derive(Debug, Clone, Copy)]
pub struct CompressionStats {
pub bytes_in: u64,
pub bytes_out: u64,
pub ratio: f64,
}
impl CompressionStats {
pub fn savings_percent(&self) -> f64 {
if self.bytes_in == 0 {
return 0.0;
}
(1.0 - self.ratio) * 100.0
}
}
pub struct AsyncStreamingCompressor {
inner: StreamingCompressor,
#[allow(dead_code)] pending: BytesMut,
}
impl AsyncStreamingCompressor {
pub fn new(config: StreamingConfig) -> Result<Self> {
Ok(Self {
inner: StreamingCompressor::new(config)?,
pending: BytesMut::with_capacity(8192),
})
}
pub fn encoding(&self) -> Option<&'static str> {
self.inner.encoding()
}
pub async fn process(&mut self, chunk: Bytes) -> Result<Bytes> {
self.inner.compress_chunk(&chunk)
}
pub async fn flush(&mut self) -> Result<Bytes> {
self.inner.flush()
}
pub fn finish(self) -> Result<Bytes> {
self.inner.finish()
}
pub fn stats(&self) -> CompressionStats {
self.inner.stats()
}
}
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn test_streaming_config() {
let config = StreamingConfig::new()
.algorithm(CompressionAlgorithm::None)
.level(6)
.flush_interval(1024);
assert_eq!(config.level, 6);
assert_eq!(config.flush_interval, 1024);
}
#[test]
fn test_passthrough_compressor() {
let config = StreamingConfig::new().algorithm(CompressionAlgorithm::None);
let mut compressor = StreamingCompressor::new(config).unwrap();
let data = b"Hello, World!";
let compressed = compressor.compress_chunk(data).unwrap();
assert_eq!(compressed.as_ref(), data);
let final_chunk = compressor.finish().unwrap();
assert!(final_chunk.is_empty());
}
#[test]
#[cfg(feature = "gzip")]
fn test_gzip_streaming() {
let config = StreamingConfig::new()
.algorithm(CompressionAlgorithm::Gzip)
.flush_interval(10);
let mut compressor = StreamingCompressor::new(config).unwrap();
let mut total_compressed = BytesMut::new();
for _ in 0..10 {
let data = b"Hello, World! This is a test chunk.\n";
let compressed = compressor.compress_chunk(data).unwrap();
total_compressed.extend_from_slice(&compressed);
}
let stats = compressor.stats();
assert_eq!(stats.bytes_in, 10 * 36);
let final_chunk = compressor.finish().unwrap();
total_compressed.extend_from_slice(&final_chunk);
let original_size = 10 * 36; assert!(total_compressed.len() < original_size);
}
#[test]
fn test_compression_stats() {
let stats = CompressionStats {
bytes_in: 1000,
bytes_out: 400,
ratio: 0.4,
};
assert_eq!(stats.savings_percent(), 60.0);
}
#[test]
fn test_low_latency_config() {
let config = StreamingConfig::low_latency();
assert_eq!(config.level, 1);
assert_eq!(config.flush_interval, 256);
}
#[test]
fn test_high_compression_config() {
let config = StreamingConfig::high_compression();
assert_eq!(config.level, 9);
assert!(config.flush_interval > 1024);
}
}