use std::io::{self, Write};
#[cfg(feature = "parallel")]
use rayon::prelude::*;
use super::Encoder;
use super::lzma::{Lzma2Encoder, Lzma2EncoderOptions, encode_lzma2_dict_size};
use crate::{Error, Result};
pub const DEFAULT_BLOCK_SIZE: usize = 4 * 1024 * 1024;
pub const MIN_BLOCK_SIZE: usize = 64 * 1024;
#[derive(Debug, Clone)]
pub struct ParallelLzma2Options {
pub level: u32,
pub dict_size: Option<u32>,
pub threads: Option<usize>,
pub block_size: usize,
}
impl Default for ParallelLzma2Options {
fn default() -> Self {
Self {
level: 6,
dict_size: None,
threads: None,
block_size: DEFAULT_BLOCK_SIZE,
}
}
}
impl ParallelLzma2Options {
pub fn new() -> Self {
Self::default()
}
pub fn level(mut self, level: u32) -> Self {
self.level = level.min(9);
self
}
pub fn dict_size(mut self, size: u32) -> Self {
self.dict_size = Some(size);
self
}
pub fn threads(mut self, threads: usize) -> Self {
self.threads = Some(threads.max(1));
self
}
pub fn block_size(mut self, size: usize) -> Self {
self.block_size = size.max(MIN_BLOCK_SIZE);
self
}
pub fn effective_dict_size(&self) -> u32 {
self.dict_size.unwrap_or(
match self.level {
0 => 64 * 1024, 1 => 256 * 1024, 2 => 1024 * 1024, 3 => 2 * 1024 * 1024, 4 => 4 * 1024 * 1024, 5 => 8 * 1024 * 1024, 6 => 16 * 1024 * 1024, 7 => 32 * 1024 * 1024, 8 => 64 * 1024 * 1024, _ => 64 * 1024 * 1024, },
)
}
pub fn effective_threads(&self) -> usize {
self.threads.unwrap_or_else(|| {
std::thread::available_parallelism()
.map(|n| n.get())
.unwrap_or(4)
})
}
pub fn properties(&self) -> Vec<u8> {
vec![encode_lzma2_dict_size(self.effective_dict_size())]
}
fn to_serial_options(&self) -> Lzma2EncoderOptions {
let mut opts = Lzma2EncoderOptions::with_preset(self.level);
if let Some(dict_size) = self.dict_size {
opts = opts.with_dict_size(dict_size);
}
opts
}
}
#[derive(Debug, Clone)]
pub struct Lzma2CompressionResult {
pub data: Vec<u8>,
pub blocks: usize,
pub uncompressed_size: u64,
pub compressed_size: u64,
}
impl Lzma2CompressionResult {
pub fn ratio(&self) -> f64 {
if self.uncompressed_size == 0 {
1.0
} else {
self.compressed_size as f64 / self.uncompressed_size as f64
}
}
pub fn space_savings(&self) -> f64 {
1.0 - self.ratio()
}
}
#[derive(Debug)]
pub struct ParallelLzma2Encoder {
options: ParallelLzma2Options,
}
impl ParallelLzma2Encoder {
pub fn new(options: ParallelLzma2Options) -> Self {
Self { options }
}
pub fn with_defaults() -> Self {
Self::new(ParallelLzma2Options::default())
}
pub fn properties(&self) -> Vec<u8> {
self.options.properties()
}
#[cfg(feature = "parallel")]
pub fn compress(&self, data: &[u8]) -> Result<Lzma2CompressionResult> {
if data.is_empty() {
return Ok(Lzma2CompressionResult {
data: vec![0x00], blocks: 0,
uncompressed_size: 0,
compressed_size: 1,
});
}
let blocks: Vec<&[u8]> = data.chunks(self.options.block_size).collect();
let num_blocks = blocks.len();
let serial_opts = self.options.to_serial_options();
let compressed_blocks: Vec<Result<Vec<u8>>> = blocks
.par_iter()
.map(|block| compress_block(block, &serial_opts))
.collect();
let mut result = Vec::new();
for block_result in compressed_blocks {
let block_data = block_result?;
result.extend_from_slice(&block_data);
}
result.push(0x00);
Ok(Lzma2CompressionResult {
compressed_size: result.len() as u64,
data: result,
blocks: num_blocks,
uncompressed_size: data.len() as u64,
})
}
#[cfg(not(feature = "parallel"))]
pub fn compress(&self, data: &[u8]) -> Result<Lzma2CompressionResult> {
let serial_opts = self.options.to_serial_options();
let mut compressed = Vec::new();
{
let mut encoder =
Lzma2Encoder::new(std::io::Cursor::new(&mut compressed), &serial_opts);
encoder.write_all(data).map_err(Error::Io)?;
Box::new(encoder).finish().map_err(Error::Io)?;
}
Ok(Lzma2CompressionResult {
compressed_size: compressed.len() as u64,
data: compressed,
blocks: 1,
uncompressed_size: data.len() as u64,
})
}
pub fn compress_default(data: &[u8]) -> Result<Vec<u8>> {
let encoder = Self::with_defaults();
let result = encoder.compress(data)?;
Ok(result.data)
}
pub fn compress_level(data: &[u8], level: u32) -> Result<Vec<u8>> {
let encoder = Self::new(ParallelLzma2Options::default().level(level));
let result = encoder.compress(data)?;
Ok(result.data)
}
}
fn compress_block(data: &[u8], options: &Lzma2EncoderOptions) -> Result<Vec<u8>> {
let mut compressed = Vec::new();
{
let cursor = std::io::Cursor::new(&mut compressed);
let mut encoder = Lzma2Encoder::new(cursor, options);
encoder.write_all(data).map_err(Error::Io)?;
}
let mut compressed = Vec::new();
{
let cursor = std::io::Cursor::new(&mut compressed);
let mut encoder = Lzma2Encoder::new(cursor, options);
encoder.write_all(data).map_err(Error::Io)?;
Box::new(encoder).finish().map_err(Error::Io)?;
}
if compressed.last() == Some(&0x00) {
compressed.pop();
}
Ok(compressed)
}
#[cfg(feature = "parallel")]
pub struct StreamingParallelLzma2Encoder<W: Write + Send> {
output: W,
options: ParallelLzma2Options,
buffer: Vec<u8>,
total_written: u64,
}
#[cfg(feature = "parallel")]
impl<W: Write + Send> StreamingParallelLzma2Encoder<W> {
pub fn new(output: W, options: ParallelLzma2Options) -> Self {
Self {
output,
options,
buffer: Vec::new(),
total_written: 0,
}
}
pub fn properties(&self) -> Vec<u8> {
self.options.properties()
}
fn flush_buffer(&mut self) -> io::Result<()> {
if self.buffer.is_empty() {
return Ok(());
}
let encoder = ParallelLzma2Encoder::new(self.options.clone());
let result = encoder
.compress(&self.buffer)
.map_err(|e| io::Error::other(e.to_string()))?;
let data_without_marker = if result.data.last() == Some(&0x00) {
&result.data[..result.data.len() - 1]
} else {
&result.data
};
self.output.write_all(data_without_marker)?;
self.total_written += data_without_marker.len() as u64;
self.buffer.clear();
Ok(())
}
pub fn finish(mut self) -> io::Result<W> {
self.flush_buffer()?;
self.output.write_all(&[0x00])?;
Ok(self.output)
}
pub fn bytes_written(&self) -> u64 {
self.total_written
}
}
#[cfg(feature = "parallel")]
impl<W: Write + Send> Write for StreamingParallelLzma2Encoder<W> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
self.buffer.extend_from_slice(buf);
if self.buffer.len() >= self.options.block_size * 2 {
self.flush_buffer()?;
}
Ok(buf.len())
}
fn flush(&mut self) -> io::Result<()> {
self.output.flush()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::codec::lzma::Lzma2Decoder;
use std::io::Read;
#[test]
fn test_parallel_options_default() {
let opts = ParallelLzma2Options::default();
assert_eq!(opts.level, 6);
assert!(opts.dict_size.is_none());
assert!(opts.threads.is_none());
assert_eq!(opts.block_size, DEFAULT_BLOCK_SIZE);
}
#[test]
fn test_parallel_options_builder() {
let opts = ParallelLzma2Options::new()
.level(9)
.dict_size(32 * 1024 * 1024)
.threads(8)
.block_size(8 * 1024 * 1024);
assert_eq!(opts.level, 9);
assert_eq!(opts.dict_size, Some(32 * 1024 * 1024));
assert_eq!(opts.threads, Some(8));
assert_eq!(opts.block_size, 8 * 1024 * 1024);
}
#[test]
fn test_effective_dict_size() {
let opts = ParallelLzma2Options::new().level(5);
assert_eq!(opts.effective_dict_size(), 8 * 1024 * 1024);
let opts_custom = ParallelLzma2Options::new().dict_size(1024 * 1024);
assert_eq!(opts_custom.effective_dict_size(), 1024 * 1024);
}
#[test]
fn test_parallel_compression_empty() {
let encoder = ParallelLzma2Encoder::with_defaults();
let result = encoder.compress(&[]).unwrap();
assert_eq!(result.blocks, 0);
assert_eq!(result.uncompressed_size, 0);
assert_eq!(result.data, vec![0x00]); }
#[test]
fn test_parallel_compression_small() {
let data = b"Hello, World!";
let encoder = ParallelLzma2Encoder::new(ParallelLzma2Options::new().level(0));
let result = encoder.compress(data).unwrap();
assert_eq!(result.blocks, 1);
assert_eq!(result.uncompressed_size, data.len() as u64);
assert!(result.compressed_size > 0);
}
#[test]
fn test_parallel_compression_roundtrip() {
let data = b"Hello, World! This is a test. ".repeat(1000);
let encoder = ParallelLzma2Encoder::new(ParallelLzma2Options::new().level(1));
let result = encoder.compress(&data).unwrap();
let props = encoder.properties();
let cursor = std::io::Cursor::new(&result.data);
let mut decoder = Lzma2Decoder::new(cursor, &props).unwrap();
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).unwrap();
assert_eq!(decompressed, data);
}
#[test]
fn test_parallel_compression_large() {
let data = vec![0u8; 5 * 1024 * 1024]; let encoder = ParallelLzma2Encoder::new(
ParallelLzma2Options::new().level(0).block_size(1024 * 1024), );
let result = encoder.compress(&data).unwrap();
assert!(result.blocks >= 5);
assert_eq!(result.uncompressed_size, data.len() as u64);
}
#[test]
fn test_compress_default() {
let data = b"Test data for compression";
let compressed = ParallelLzma2Encoder::compress_default(data).unwrap();
assert!(!compressed.is_empty());
}
#[test]
fn test_compress_level() {
let data = b"Test data for compression with level";
let compressed = ParallelLzma2Encoder::compress_level(data, 3).unwrap();
assert!(!compressed.is_empty());
}
#[test]
fn test_compression_result_metrics() {
let result = Lzma2CompressionResult {
data: vec![],
blocks: 10,
uncompressed_size: 1000,
compressed_size: 500,
};
assert!((result.ratio() - 0.5).abs() < 0.001);
assert!((result.space_savings() - 0.5).abs() < 0.001);
}
#[test]
fn test_compression_result_empty() {
let result = Lzma2CompressionResult {
data: vec![],
blocks: 0,
uncompressed_size: 0,
compressed_size: 0,
};
assert!((result.ratio() - 1.0).abs() < 0.001);
}
#[cfg(feature = "parallel")]
#[test]
fn test_streaming_encoder() {
use std::io::Cursor;
let mut output = Vec::new();
let opts = ParallelLzma2Options::new().level(0).block_size(1024);
{
let cursor = Cursor::new(&mut output);
let mut encoder = StreamingParallelLzma2Encoder::new(cursor, opts.clone());
encoder.write_all(b"Hello, World!").unwrap();
encoder.finish().unwrap();
}
let cursor = Cursor::new(&output);
let props = opts.properties();
let mut decoder = Lzma2Decoder::new(cursor, &props).unwrap();
let mut decompressed = Vec::new();
decoder.read_to_end(&mut decompressed).unwrap();
assert_eq!(decompressed, b"Hello, World!");
}
#[test]
fn test_properties() {
let opts = ParallelLzma2Options::new().level(6);
let props = opts.properties();
assert_eq!(props.len(), 1);
assert!(props[0] > 0);
}
}