use flate2::Compression;
use std::io::{self, Read, Write};
use std::path::Path;
use crate::compress::optimization::{CompressionBackend, OptimizationConfig};
use crate::compress::parallel::{GzipHeaderInfo, ParallelGzEncoder};
use crate::compress::pipelined::PipelinedGzEncoder;
pub struct SimpleOptimizer {
config: OptimizationConfig,
header_info: GzipHeaderInfo,
}
impl SimpleOptimizer {
pub fn new(config: OptimizationConfig) -> Self {
Self {
config,
header_info: GzipHeaderInfo::default(),
}
}
pub fn with_header_info(mut self, info: GzipHeaderInfo) -> Self {
self.header_info = info;
self
}
pub fn compress<R: Read, W: Write + Send>(&self, reader: R, writer: W) -> io::Result<u64> {
if self.config.compression_level >= 10 {
return self.compress_parallel(reader, writer);
}
match self.config.backend {
CompressionBackend::Parallel => self.compress_parallel(reader, writer),
CompressionBackend::SingleThreaded => self.compress_single_threaded(reader, writer),
}
}
fn compress_parallel<R: Read, W: Write + Send>(&self, reader: R, writer: W) -> io::Result<u64> {
let optimal_threads = self.calculate_optimal_threads();
let compression_level = self.config.compression_level as u32;
if self.config.compression_level >= 10 {
let mut encoder = ParallelGzEncoder::new(compression_level, optimal_threads);
encoder.set_header_info(self.header_info.clone());
return encoder.compress(reader, writer);
}
if self.config.compression_level >= 6 && optimal_threads > 1 {
let mut encoder = PipelinedGzEncoder::new(compression_level, optimal_threads);
encoder.set_header_info(self.header_info.clone());
return encoder.compress(reader, writer);
}
let mut encoder = ParallelGzEncoder::new(compression_level, optimal_threads);
encoder.set_header_info(self.header_info.clone());
encoder.compress(reader, writer)
}
pub fn compress_file<P: AsRef<Path>, W: Write + Send>(
&self,
path: P,
writer: W,
) -> io::Result<u64> {
let optimal_threads = self.calculate_optimal_threads();
let compression_level = self.config.compression_level as u32;
if self.config.compression_level >= 10 {
let mut encoder = ParallelGzEncoder::new(compression_level, optimal_threads);
encoder.set_header_info(self.header_info.clone());
return encoder.compress_file(path, writer);
}
if optimal_threads == 1 {
if self.config.compression_level >= 6 {
let mut encoder = PipelinedGzEncoder::new(compression_level, 1);
encoder.set_header_info(self.header_info.clone());
return encoder.compress_file(path, writer);
}
let file = std::fs::File::open(&path)?;
let mmap = unsafe { memmap2::Mmap::map(&file)? };
let mut writer = writer;
crate::compress::parallel::compress_single_member(
&mut writer,
&mmap,
compression_level,
&self.header_info,
)?;
return Ok(mmap.len() as u64);
}
if self.config.compression_level >= 6 {
let mut encoder = PipelinedGzEncoder::new(compression_level, optimal_threads);
encoder.set_header_info(self.header_info.clone());
return encoder.compress_file(path, writer);
}
let mut encoder = ParallelGzEncoder::new(compression_level, optimal_threads);
encoder.set_header_info(self.header_info.clone());
encoder.compress_file(path, writer)
}
fn compress_single_threaded<R: Read, W: Write>(
&self,
mut reader: R,
writer: W,
) -> io::Result<u64> {
let adjusted_level = if self.config.compression_level == 1 {
2
} else {
self.config.compression_level.min(9) };
let compression = Compression::new(adjusted_level as u32);
let mut builder = flate2::GzBuilder::new();
if let Some(ref name) = self.header_info.filename {
builder = builder.filename(name.as_bytes());
}
builder = builder.mtime(self.header_info.mtime);
if let Some(ref comment) = self.header_info.comment {
builder = builder.comment(comment.as_bytes());
}
let mut encoder = builder.write(writer, compression);
let bytes_written = io::copy(&mut reader, &mut encoder)?;
encoder.finish()?;
Ok(bytes_written)
}
fn calculate_optimal_threads(&self) -> usize {
self.config.thread_count
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::compress::optimization::{CompressionBackend, ContentType};
use std::io::Cursor;
#[test]
fn test_thread_count_respects_request() {
let config = OptimizationConfig {
thread_count: 4,
buffer_size: 65536,
backend: CompressionBackend::Parallel,
content_type: ContentType::Binary,
use_numa_pinning: false,
compression_level: 6,
};
let optimizer = SimpleOptimizer::new(config);
assert_eq!(optimizer.calculate_optimal_threads(), 4);
}
#[test]
fn test_compression() {
let config = OptimizationConfig {
thread_count: 4,
buffer_size: 65536,
backend: CompressionBackend::Parallel,
content_type: ContentType::Text,
use_numa_pinning: false,
compression_level: 6,
};
let optimizer = SimpleOptimizer::new(config);
let input = b"Hello, world! This is a test of the simple optimizer.".repeat(1000);
let cursor = Cursor::new(input.clone());
let mut output = Vec::new();
let result = optimizer.compress(cursor, &mut output);
assert!(result.is_ok());
assert!(!output.is_empty());
assert!(output.len() < input.len());
}
#[test]
fn test_single_threaded_compression() {
let config = OptimizationConfig {
thread_count: 1,
buffer_size: 65536,
backend: CompressionBackend::SingleThreaded,
content_type: ContentType::Text,
use_numa_pinning: false,
compression_level: 6,
};
let optimizer = SimpleOptimizer::new(config);
let input = b"Hello, world! This is a test of single-threaded compression.".repeat(1000);
let cursor = Cursor::new(input.clone());
let mut output = Vec::new();
let result = optimizer.compress(cursor, &mut output);
assert!(result.is_ok());
assert!(!output.is_empty());
assert!(output.len() < input.len());
}
}