use std::fs::File;
use std::io::{Read, Write};
use std::path::Path;
use oxiarc_archive::{bzip2, gzip, lz4, zstd};
pub mod ndarray;
use crate::error::{IoError, Result};
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub enum CompressionAlgorithm {
Gzip,
Zstd,
Lz4,
Bzip2,
Brotli,
Snappy,
FpZip,
DeltaLz4,
}
impl CompressionAlgorithm {
pub fn extension(&self) -> &'static str {
match self {
CompressionAlgorithm::Gzip => "gz",
CompressionAlgorithm::Zstd => "zst",
CompressionAlgorithm::Lz4 => "lz4",
CompressionAlgorithm::Bzip2 => "bz2",
CompressionAlgorithm::Brotli => "br",
CompressionAlgorithm::Snappy => "snappy",
CompressionAlgorithm::FpZip => "fpz",
CompressionAlgorithm::DeltaLz4 => "dlz4",
}
}
pub fn from_extension(ext: &str) -> Option<Self> {
match ext.to_lowercase().as_str() {
"gz" | "gzip" => Some(CompressionAlgorithm::Gzip),
"zst" | "zstd" => Some(CompressionAlgorithm::Zstd),
"lz4" => Some(CompressionAlgorithm::Lz4),
"bz2" | "bzip2" => Some(CompressionAlgorithm::Bzip2),
"br" | "brotli" => Some(CompressionAlgorithm::Brotli),
"snappy" | "snp" => Some(CompressionAlgorithm::Snappy),
"fpz" | "fpzip" => Some(CompressionAlgorithm::FpZip),
"dlz4" | "delta-lz4" => Some(CompressionAlgorithm::DeltaLz4),
_ => None,
}
}
}
#[allow(dead_code)]
fn normalize_level(level: Option<u32>, algorithm: CompressionAlgorithm) -> Result<u32> {
let _level = level.unwrap_or(6);
if _level > 9 {
return Err(IoError::CompressionError(format!(
"Compression level must be between 0 and 9, got {_level}"
)));
}
match algorithm {
CompressionAlgorithm::Gzip => Ok(_level),
CompressionAlgorithm::Zstd => {
Ok(1 + (_level * 21) / 9)
}
CompressionAlgorithm::Lz4 => Ok(_level),
CompressionAlgorithm::Bzip2 => Ok(_level),
CompressionAlgorithm::Brotli => {
Ok((_level * 11) / 9)
}
CompressionAlgorithm::Snappy => Ok(0), CompressionAlgorithm::FpZip => Ok(_level), CompressionAlgorithm::DeltaLz4 => Ok(_level), }
}
#[allow(dead_code)]
pub fn compress_data(
mut data: &[u8],
algorithm: CompressionAlgorithm,
level: Option<u32>,
) -> Result<Vec<u8>> {
let normalized_level = normalize_level(level, algorithm)?;
match algorithm {
CompressionAlgorithm::Gzip => {
gzip::compress(data, normalized_level as u8)
.map_err(|e| IoError::CompressionError(e.to_string()))
}
CompressionAlgorithm::Zstd => {
let writer = zstd::ZstdWriter::new();
writer
.compress(data)
.map_err(|e| IoError::CompressionError(e.to_string()))
}
CompressionAlgorithm::Lz4 => {
use std::io::Write;
let mut writer = lz4::Lz4Writer::new(Vec::new());
writer
.write_compressed(data)
.map_err(|e| IoError::CompressionError(e.to_string()))?;
Ok(writer.into_inner())
}
CompressionAlgorithm::Bzip2 => {
let writer = bzip2::Bzip2Writer::with_level(normalized_level as u8);
writer
.compress(data)
.map_err(|e| IoError::CompressionError(e.to_string()))
}
CompressionAlgorithm::Brotli => oxiarc_brotli::compress(data, normalized_level)
.map_err(|e| IoError::CompressionError(format!("Brotli compression failed: {e}"))),
CompressionAlgorithm::Snappy => Ok(oxiarc_snappy::compress(data)),
CompressionAlgorithm::FpZip => {
compress_fpzip(data, normalized_level)
}
CompressionAlgorithm::DeltaLz4 => {
compress_delta_lz4(data, normalized_level)
}
}
}
#[allow(dead_code)]
pub fn decompress_data(data: &[u8], algorithm: CompressionAlgorithm) -> Result<Vec<u8>> {
match algorithm {
CompressionAlgorithm::Gzip => {
use std::io::Cursor;
let mut cursor = Cursor::new(data);
gzip::decompress(&mut cursor).map_err(|e| IoError::DecompressionError(e.to_string()))
}
CompressionAlgorithm::Zstd => {
use std::io::Cursor;
let cursor = Cursor::new(data);
let mut reader = zstd::ZstdReader::new(cursor)
.map_err(|e| IoError::DecompressionError(e.to_string()))?;
reader
.decompress()
.map_err(|e| IoError::DecompressionError(e.to_string()))
}
CompressionAlgorithm::Lz4 => {
use std::io::Cursor;
let cursor = Cursor::new(data);
let mut reader = lz4::Lz4Reader::new(cursor)
.map_err(|e| IoError::DecompressionError(e.to_string()))?;
reader
.decompress()
.map_err(|e| IoError::DecompressionError(e.to_string()))
}
CompressionAlgorithm::Bzip2 => {
bzip2::decompress(data).map_err(|e| IoError::DecompressionError(e.to_string()))
}
CompressionAlgorithm::Brotli => oxiarc_brotli::decompress(data)
.map_err(|e| IoError::DecompressionError(format!("Brotli decompression failed: {e}"))),
CompressionAlgorithm::Snappy => oxiarc_snappy::decompress(data)
.map_err(|e| IoError::DecompressionError(format!("Snappy decompression failed: {e}"))),
CompressionAlgorithm::FpZip => {
decompress_fpzip(data)
}
CompressionAlgorithm::DeltaLz4 => {
decompress_delta_lz4(data)
}
}
}
#[allow(dead_code)]
pub fn compress_file<P: AsRef<Path>>(
input_path: P,
output_path: Option<P>,
algorithm: CompressionAlgorithm,
level: Option<u32>,
) -> Result<String> {
let mut input_data = Vec::new();
File::open(input_path.as_ref())
.map_err(|e| IoError::FileError(format!("Failed to open input file: {e}")))?
.read_to_end(&mut input_data)
.map_err(|e| IoError::FileError(format!("Failed to read input file: {e}")))?;
let compressed_data = compress_data(&input_data, algorithm, level)?;
let output_path_string = match output_path {
Some(path) => path.as_ref().to_string_lossy().to_string(),
None => {
let mut path_buf = input_path.as_ref().to_path_buf();
let ext = algorithm.extension();
let file_name = path_buf
.file_name()
.ok_or_else(|| IoError::FileError("Invalid input file _path".to_string()))?
.to_string_lossy()
.to_string();
let new_file_name = format!("{file_name}.{ext}");
path_buf.set_file_name(new_file_name);
path_buf.to_string_lossy().to_string()
}
};
File::create(&output_path_string)
.map_err(|e| IoError::FileError(format!("Failed to create output file: {e}")))?
.write_all(&compressed_data)
.map_err(|e| IoError::FileError(format!("Failed to write to output file: {e}")))?;
Ok(output_path_string)
}
#[allow(dead_code)]
pub fn decompress_file<P: AsRef<Path>>(
input_path: P,
output_path: Option<P>,
algorithm: Option<CompressionAlgorithm>,
) -> Result<String> {
let algorithm = match algorithm {
Some(algo) => algo,
None => {
let ext = input_path
.as_ref()
.extension()
.ok_or_else(|| {
IoError::DecompressionError("Unable to determine file extension".to_string())
})?
.to_string_lossy()
.to_string();
CompressionAlgorithm::from_extension(&ext)
.ok_or(IoError::UnsupportedCompressionAlgorithm(ext))?
}
};
let mut input_data = Vec::new();
File::open(input_path.as_ref())
.map_err(|e| IoError::FileError(format!("Failed to open input file: {e}")))?
.read_to_end(&mut input_data)
.map_err(|e| IoError::FileError(format!("Failed to read input file: {e}")))?;
let decompressed_data = decompress_data(&input_data, algorithm)?;
let output_path_string = match output_path {
Some(path) => path.as_ref().to_string_lossy().to_string(),
None => {
let path_str = input_path.as_ref().to_string_lossy().to_string();
let ext = algorithm.extension();
if path_str.ends_with(&format!(".{ext}")) {
path_str[0..path_str.len() - ext.len() - 1].to_string()
} else {
format!("{path_str}.decompressed")
}
}
};
File::create(&output_path_string)
.map_err(|e| IoError::FileError(format!("Failed to create output file: {e}")))?
.write_all(&decompressed_data)
.map_err(|e| IoError::FileError(format!("Failed to write to output file: {e}")))?;
Ok(output_path_string)
}
#[allow(dead_code)]
pub fn compression_ratio(
data: &[u8],
algorithm: CompressionAlgorithm,
level: Option<u32>,
) -> Result<f64> {
let compressed = compress_data(data, algorithm, level)?;
let original_size = data.len() as f64;
let compressed_size = compressed.len() as f64;
if compressed_size == 0.0 {
return Err(IoError::CompressionError(
"Compressed data has zero size".to_string(),
));
}
Ok(original_size / compressed_size)
}
pub struct CompressionInfo {
pub name: String,
pub description: String,
pub typical_compression_ratio: f64,
pub compression_speed: u8,
pub decompression_speed: u8,
pub file_extension: String,
}
#[allow(dead_code)]
pub fn algorithm_info(algorithm: CompressionAlgorithm) -> CompressionInfo {
match algorithm {
CompressionAlgorithm::Gzip => CompressionInfo {
name: "GZIP".to_string(),
description: "General-purpose compression _algorithm with good balance of speed and compression ratio".to_string(),
typical_compression_ratio: 2.5,
compression_speed: 6,
decompression_speed: 7,
file_extension: "gz".to_string(),
},
CompressionAlgorithm::Zstd => CompressionInfo {
name: "Zstandard".to_string(),
description: "Modern compression _algorithm with excellent compression ratio and fast decompression".to_string(),
typical_compression_ratio: 3.2,
compression_speed: 7,
decompression_speed: 9,
file_extension: "zst".to_string(),
},
CompressionAlgorithm::Lz4 => CompressionInfo {
name: "LZ4".to_string(),
description: "Extremely fast compression _algorithm with moderate compression ratio".to_string(),
typical_compression_ratio: 1.8,
compression_speed: 10,
decompression_speed: 10,
file_extension: "lz4".to_string(),
},
CompressionAlgorithm::Bzip2 => CompressionInfo {
name: "BZIP2".to_string(),
description: "High compression ratio but slower speed, good for archival storage".to_string(),
typical_compression_ratio: 3.5,
compression_speed: 3,
decompression_speed: 4,
file_extension: "bz2".to_string(),
},
CompressionAlgorithm::Brotli => CompressionInfo {
name: "Brotli".to_string(),
description: "Web-optimized compression _algorithm with excellent compression ratio".to_string(),
typical_compression_ratio: 3.8,
compression_speed: 5,
decompression_speed: 8,
file_extension: "br".to_string(),
},
CompressionAlgorithm::Snappy => CompressionInfo {
name: "Snappy".to_string(),
description: "Very fast compression _algorithm developed by Google".to_string(),
typical_compression_ratio: 1.5,
compression_speed: 10,
decompression_speed: 10,
file_extension: "snappy".to_string(),
},
CompressionAlgorithm::FpZip => CompressionInfo {
name: "FPZip".to_string(),
description: "Floating-point specific compression optimized for scientific data".to_string(),
typical_compression_ratio: 4.0,
compression_speed: 7,
decompression_speed: 8,
file_extension: "fpz".to_string(),
},
CompressionAlgorithm::DeltaLz4 => CompressionInfo {
name: "Delta+LZ4".to_string(),
description: "Delta encoding followed by LZ4 compression, optimized for time series data".to_string(),
typical_compression_ratio: 5.0,
compression_speed: 8,
decompression_speed: 9,
file_extension: "dlz4".to_string(),
},
}
}
const GZIP_MAGIC: &[u8] = &[0x1f, 0x8b];
const ZSTD_MAGIC: &[u8] = &[0x28, 0xb5, 0x2f, 0xfd];
const LZ4_MAGIC: &[u8] = &[0x04, 0x22, 0x4d, 0x18];
const BZIP2_MAGIC: &[u8] = &[0x42, 0x5a, 0x68];
const BROTLI_MAGIC: &[u8] = &[0xce, 0xb2, 0xcf, 0x81]; const SNAPPY_MAGIC: &[u8] = &[0x73, 0x4e, 0x61, 0x50]; const FPZIP_MAGIC: &[u8] = &[0x46, 0x50, 0x5a, 0x49]; const DELTA_LZ4_MAGIC: &[u8] = &[0x44, 0x4c, 0x5a, 0x34];
#[allow(dead_code)]
pub fn detect_compression_from_bytes(data: &[u8]) -> Option<CompressionAlgorithm> {
if data.starts_with(GZIP_MAGIC) {
Some(CompressionAlgorithm::Gzip)
} else if data.starts_with(ZSTD_MAGIC) {
Some(CompressionAlgorithm::Zstd)
} else if data.starts_with(LZ4_MAGIC) {
Some(CompressionAlgorithm::Lz4)
} else if data.starts_with(BZIP2_MAGIC) {
Some(CompressionAlgorithm::Bzip2)
} else if data.starts_with(BROTLI_MAGIC) {
Some(CompressionAlgorithm::Brotli)
} else if data.starts_with(SNAPPY_MAGIC) {
Some(CompressionAlgorithm::Snappy)
} else if data.starts_with(FPZIP_MAGIC) {
Some(CompressionAlgorithm::FpZip)
} else if data.starts_with(DELTA_LZ4_MAGIC) {
Some(CompressionAlgorithm::DeltaLz4)
} else {
None
}
}
pub struct TransparentFileHandler {
pub auto_detect_extension: bool,
pub auto_detect_content: bool,
pub default_algorithm: CompressionAlgorithm,
pub default_level: Option<u32>,
}
impl Default for TransparentFileHandler {
fn default() -> Self {
Self {
auto_detect_extension: true,
auto_detect_content: true,
default_algorithm: CompressionAlgorithm::Zstd,
default_level: Some(6),
}
}
}
impl TransparentFileHandler {
pub fn new(
auto_detect_extension: bool,
auto_detect_content: bool,
default_algorithm: CompressionAlgorithm,
default_level: Option<u32>,
) -> Self {
Self {
auto_detect_extension,
auto_detect_content,
default_algorithm,
default_level,
}
}
pub fn read_file<P: AsRef<Path>>(&self, path: P) -> Result<Vec<u8>> {
let mut file_data = Vec::new();
File::open(path.as_ref())
.map_err(|e| IoError::FileError(format!("Failed to open file: {e}")))?
.read_to_end(&mut file_data)
.map_err(|e| IoError::FileError(format!("Failed to read file: {e}")))?;
let mut algorithm = None;
if self.auto_detect_extension {
if let Some(ext) = path.as_ref().extension() {
algorithm = CompressionAlgorithm::from_extension(&ext.to_string_lossy());
}
}
if algorithm.is_none() && self.auto_detect_content {
algorithm = detect_compression_from_bytes(&file_data);
}
match algorithm {
Some(algo) => decompress_data(&file_data, algo),
None => Ok(file_data), }
}
pub fn write_file<P: AsRef<Path>>(&self, path: P, data: &[u8]) -> Result<()> {
let mut algorithm = None;
let level = self.default_level;
if self.auto_detect_extension {
if let Some(ext) = path.as_ref().extension() {
algorithm = CompressionAlgorithm::from_extension(&ext.to_string_lossy());
}
}
if algorithm.is_none() && self.should_compress_by_default(&path) {
algorithm = Some(self.default_algorithm);
}
let output_data = match algorithm {
Some(algo) => compress_data(data, algo, level)?,
None => data.to_vec(),
};
File::create(path.as_ref())
.map_err(|e| IoError::FileError(format!("Failed to create file: {e}")))?
.write_all(&output_data)
.map_err(|e| IoError::FileError(format!("Failed to write file: {e}")))?;
Ok(())
}
fn should_compress_by_default<P: AsRef<Path>>(&self, path: P) -> bool {
if let Some(ext) = path.as_ref().extension() {
let ext_str = ext.to_string_lossy().to_lowercase();
matches!(
ext_str.as_str(),
"gz" | "gzip" | "zst" | "zstd" | "lz4" | "bz2" | "bzip2"
)
} else {
false
}
}
pub fn copy_file<P: AsRef<Path>, Q: AsRef<Path>>(
&self,
source: P,
destination: Q,
) -> Result<()> {
let data = self.read_file(source)?;
self.write_file(destination, &data)?;
Ok(())
}
pub fn file_info<P: AsRef<Path>>(&self, path: P) -> Result<FileCompressionInfo> {
let mut file_data = Vec::new();
File::open(path.as_ref())
.map_err(|e| IoError::FileError(format!("Failed to open file: {e}")))?
.read_to_end(&mut file_data)
.map_err(|e| IoError::FileError(format!("Failed to read file: {e}")))?;
let original_size = file_data.len();
let detected_algorithm = if self.auto_detect_content {
detect_compression_from_bytes(&file_data)
} else {
None
};
let extension_algorithm = if self.auto_detect_extension {
path.as_ref()
.extension()
.and_then(|ext| CompressionAlgorithm::from_extension(&ext.to_string_lossy()))
} else {
None
};
let is_compressed = detected_algorithm.is_some() || extension_algorithm.is_some();
let algorithm = detected_algorithm.or(extension_algorithm);
let uncompressed_size = if let Some(algo) = algorithm {
match decompress_data(&file_data, algo) {
Ok(decompressed) => Some(decompressed.len()),
Err(_) => None,
}
} else {
Some(original_size)
};
Ok(FileCompressionInfo {
path: path.as_ref().to_path_buf(),
is_compressed,
algorithm,
compressed_size: if is_compressed {
Some(original_size)
} else {
None
},
uncompressed_size,
compression_ratio: if let (Some(compressed), Some(uncompressed)) = (
if is_compressed {
Some(original_size)
} else {
None
},
uncompressed_size,
) {
Some(uncompressed as f64 / compressed as f64)
} else {
None
},
})
}
}
#[derive(Debug, Clone)]
pub struct FileCompressionInfo {
pub path: std::path::PathBuf,
pub is_compressed: bool,
pub algorithm: Option<CompressionAlgorithm>,
pub compressed_size: Option<usize>,
pub uncompressed_size: Option<usize>,
pub compression_ratio: Option<f64>,
}
static GLOBAL_HANDLER: std::sync::OnceLock<TransparentFileHandler> = std::sync::OnceLock::new();
#[allow(dead_code)]
pub fn init_global_handler(handler: TransparentFileHandler) {
let _ = GLOBAL_HANDLER.set(handler);
}
#[allow(dead_code)]
pub fn global_handler() -> &'static TransparentFileHandler {
GLOBAL_HANDLER.get_or_init(TransparentFileHandler::default)
}
#[allow(dead_code)]
pub fn read_file_transparent<P: AsRef<Path>>(path: P) -> Result<Vec<u8>> {
global_handler().read_file(path)
}
#[allow(dead_code)]
pub fn write_file_transparent<P: AsRef<Path>>(path: P, data: &[u8]) -> Result<()> {
global_handler().write_file(path, data)
}
#[allow(dead_code)]
pub fn copy_file_transparent<P: AsRef<Path>, Q: AsRef<Path>>(
source: P,
destination: Q,
) -> Result<()> {
global_handler().copy_file(source, destination)
}
#[allow(dead_code)]
pub fn file_info_transparent<P: AsRef<Path>>(path: P) -> Result<FileCompressionInfo> {
global_handler().file_info(path)
}
#[allow(dead_code)]
fn compress_fpzip(data: &[u8], level: u32) -> Result<Vec<u8>> {
let mut result = Vec::with_capacity(FPZIP_MAGIC.len() + data.len());
result.extend_from_slice(FPZIP_MAGIC);
if data.len().is_multiple_of(8) {
let float_data =
unsafe { std::slice::from_raw_parts(data.as_ptr() as *const f64, data.len() / 8) };
let mut encoded_data = Vec::with_capacity(data.len());
if !float_data.is_empty() {
encoded_data.extend_from_slice(&float_data[0].to_le_bytes());
for i in 1..float_data.len() {
let current_bits = float_data[i].to_bits();
let prev_bits = float_data[i - 1].to_bits();
let xor_result = current_bits ^ prev_bits;
encoded_data.extend_from_slice(&xor_result.to_le_bytes());
}
}
let compressed = compress_data(&encoded_data, CompressionAlgorithm::Lz4, Some(level))?;
result.extend_from_slice(&compressed);
} else if data.len().is_multiple_of(4) {
let float_data =
unsafe { std::slice::from_raw_parts(data.as_ptr() as *const f32, data.len() / 4) };
let mut encoded_data = Vec::with_capacity(data.len());
if !float_data.is_empty() {
encoded_data.extend_from_slice(&float_data[0].to_le_bytes());
for i in 1..float_data.len() {
let current_bits = float_data[i].to_bits();
let prev_bits = float_data[i - 1].to_bits();
let xor_result = current_bits ^ prev_bits;
encoded_data.extend_from_slice(&xor_result.to_le_bytes());
}
}
let compressed = compress_data(&encoded_data, CompressionAlgorithm::Lz4, Some(level))?;
result.extend_from_slice(&compressed);
} else {
let compressed = compress_data(data, CompressionAlgorithm::Zstd, Some(level))?;
result.extend_from_slice(&compressed);
}
Ok(result)
}
#[allow(dead_code)]
fn decompress_fpzip(data: &[u8]) -> Result<Vec<u8>> {
if !data.starts_with(FPZIP_MAGIC) {
return Err(IoError::DecompressionError(
"Invalid FPZip magic bytes".to_string(),
));
}
let compressed_data = &data[FPZIP_MAGIC.len()..];
let decompressed = decompress_data(compressed_data, CompressionAlgorithm::Lz4)?;
if decompressed.len() % 8 == 0 {
let mut float_data = unsafe {
std::slice::from_raw_parts(decompressed.as_ptr() as *const u64, decompressed.len() / 8)
}
.to_vec();
for i in 1..float_data.len() {
float_data[i] ^= float_data[i - 1];
}
let result = unsafe {
std::slice::from_raw_parts(float_data.as_ptr() as *const u8, float_data.len() * 8)
}
.to_vec();
Ok(result)
} else if decompressed.len() % 4 == 0 {
let mut float_data = unsafe {
std::slice::from_raw_parts(decompressed.as_ptr() as *const u32, decompressed.len() / 4)
}
.to_vec();
for i in 1..float_data.len() {
float_data[i] ^= float_data[i - 1];
}
let result = unsafe {
std::slice::from_raw_parts(float_data.as_ptr() as *const u8, float_data.len() * 4)
}
.to_vec();
Ok(result)
} else {
decompress_data(compressed_data, CompressionAlgorithm::Zstd)
}
}
#[allow(dead_code)]
fn compress_delta_lz4(data: &[u8], level: u32) -> Result<Vec<u8>> {
let mut result = Vec::with_capacity(DELTA_LZ4_MAGIC.len() + data.len());
result.extend_from_slice(DELTA_LZ4_MAGIC);
if data.len() < 8 {
let compressed = compress_data(data, CompressionAlgorithm::Lz4, Some(level))?;
result.extend_from_slice(&compressed);
return Ok(result);
}
let mut delta_encoded = Vec::with_capacity(data.len());
if data.len().is_multiple_of(8) {
let values =
unsafe { std::slice::from_raw_parts(data.as_ptr() as *const i64, data.len() / 8) };
delta_encoded.extend_from_slice(&values[0].to_le_bytes());
for i in 1..values.len() {
let delta = values[i].wrapping_sub(values[i - 1]);
delta_encoded.extend_from_slice(&delta.to_le_bytes());
}
} else if data.len().is_multiple_of(4) {
let values =
unsafe { std::slice::from_raw_parts(data.as_ptr() as *const i32, data.len() / 4) };
delta_encoded.extend_from_slice(&values[0].to_le_bytes());
for i in 1..values.len() {
let delta = values[i].wrapping_sub(values[i - 1]);
delta_encoded.extend_from_slice(&delta.to_le_bytes());
}
} else {
delta_encoded.push(data[0]);
for i in 1..data.len() {
let delta = data[i].wrapping_sub(data[i - 1]);
delta_encoded.push(delta);
}
}
let compressed = compress_data(&delta_encoded, CompressionAlgorithm::Lz4, Some(level))?;
result.extend_from_slice(&compressed);
Ok(result)
}
#[allow(dead_code)]
fn decompress_delta_lz4(data: &[u8]) -> Result<Vec<u8>> {
if !data.starts_with(DELTA_LZ4_MAGIC) {
return Err(IoError::DecompressionError(
"Invalid Delta-LZ4 magic bytes".to_string(),
));
}
let compressed_data = &data[DELTA_LZ4_MAGIC.len()..];
let delta_data = decompress_data(compressed_data, CompressionAlgorithm::Lz4)?;
if delta_data.len() < 8 {
return Ok(delta_data);
}
if delta_data.len() % 8 == 0 {
let mut values = unsafe {
std::slice::from_raw_parts(delta_data.as_ptr() as *const i64, delta_data.len() / 8)
}
.to_vec();
for i in 1..values.len() {
values[i] = values[i - 1].wrapping_add(values[i]);
}
let result =
unsafe { std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 8) }
.to_vec();
Ok(result)
} else if delta_data.len() % 4 == 0 {
let mut values = unsafe {
std::slice::from_raw_parts(delta_data.as_ptr() as *const i32, delta_data.len() / 4)
}
.to_vec();
for i in 1..values.len() {
values[i] = values[i - 1].wrapping_add(values[i]);
}
let result =
unsafe { std::slice::from_raw_parts(values.as_ptr() as *const u8, values.len() * 4) }
.to_vec();
Ok(result)
} else {
let mut result = delta_data.clone();
for i in 1..result.len() {
result[i] = result[i - 1].wrapping_add(result[i]);
}
Ok(result)
}
}
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;
use std::time::Instant;
#[derive(Debug, Clone)]
pub struct ParallelCompressionConfig {
pub num_threads: usize,
pub chunk_size: usize,
pub buffer_size: usize,
pub enable_memory_mapping: bool,
}
impl Default for ParallelCompressionConfig {
fn default() -> Self {
Self {
num_threads: 0, chunk_size: 1024 * 1024, buffer_size: 64 * 1024, enable_memory_mapping: true,
}
}
}
#[derive(Debug, Clone)]
pub struct ParallelCompressionStats {
pub chunks_processed: usize,
pub bytes_processed: usize,
pub bytes_output: usize,
pub operation_time_ms: f64,
pub throughput_bps: f64,
pub compression_ratio: f64,
pub threads_used: usize,
}
#[allow(dead_code)]
pub fn compress_data_parallel(
data: &[u8],
algorithm: CompressionAlgorithm,
level: Option<u32>,
config: ParallelCompressionConfig,
) -> Result<(Vec<u8>, ParallelCompressionStats)> {
let start_time = Instant::now();
let input_size = data.len();
let num_threads = if config.num_threads == 0 {
std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(1)
} else {
config.num_threads
};
if input_size <= config.chunk_size {
let compressed = compress_data(data, algorithm, level)?;
let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
let stats = ParallelCompressionStats {
chunks_processed: 1,
bytes_processed: input_size,
bytes_output: compressed.len(),
operation_time_ms: operation_time,
throughput_bps: input_size as f64 / (operation_time / 1000.0),
compression_ratio: input_size as f64 / compressed.len() as f64,
threads_used: 1,
};
return Ok((compressed, stats));
}
let chunk_size = config.chunk_size;
let chunks: Vec<&[u8]> = data.chunks(chunk_size).collect();
let chunk_count = chunks.len();
let processed_count = Arc::new(AtomicUsize::new(0));
let compressed_chunks: Result<Vec<Vec<u8>>> = chunks
.into_iter()
.map(|chunk| {
let result = compress_data(chunk, algorithm, level);
processed_count.fetch_add(1, Ordering::Relaxed);
result
})
.collect();
let compressed_chunks = compressed_chunks?;
let total_compressed_size: usize = compressed_chunks.iter().map(|chunk| chunk.len()).sum();
let mut result = Vec::with_capacity(total_compressed_size + (chunk_count * 8));
result.extend_from_slice(&(chunk_count as u64).to_le_bytes());
for chunk in &compressed_chunks {
result.extend_from_slice(&(chunk.len() as u64).to_le_bytes());
}
for chunk in compressed_chunks {
result.extend_from_slice(&chunk);
}
let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
let stats = ParallelCompressionStats {
chunks_processed: chunk_count,
bytes_processed: input_size,
bytes_output: result.len(),
operation_time_ms: operation_time,
throughput_bps: input_size as f64 / (operation_time / 1000.0),
compression_ratio: input_size as f64 / result.len() as f64,
threads_used: num_threads,
};
Ok((result, stats))
}
#[allow(dead_code)]
pub fn decompress_data_parallel(
data: &[u8],
algorithm: CompressionAlgorithm,
config: ParallelCompressionConfig,
) -> Result<(Vec<u8>, ParallelCompressionStats)> {
let start_time = Instant::now();
let input_size = data.len();
let num_threads = if config.num_threads == 0 {
std::thread::available_parallelism()
.map(|p| p.get())
.unwrap_or(1)
} else {
config.num_threads
};
if data.len() < 8 {
let decompressed = decompress_data(data, algorithm)?;
let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
let stats = ParallelCompressionStats {
chunks_processed: 1,
bytes_processed: input_size,
bytes_output: decompressed.len(),
operation_time_ms: operation_time,
throughput_bps: decompressed.len() as f64 / (operation_time / 1000.0),
compression_ratio: decompressed.len() as f64 / input_size as f64,
threads_used: 1,
};
return Ok((decompressed, stats));
}
let chunk_count = u64::from_le_bytes(
data[0..8]
.try_into()
.map_err(|_| IoError::DecompressionError("Invalid chunk header".to_string()))?,
) as usize;
if chunk_count == 0 || chunk_count > data.len() / 8 {
let decompressed = decompress_data(data, algorithm)?;
let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
let stats = ParallelCompressionStats {
chunks_processed: 1,
bytes_processed: input_size,
bytes_output: decompressed.len(),
operation_time_ms: operation_time,
throughput_bps: decompressed.len() as f64 / (operation_time / 1000.0),
compression_ratio: decompressed.len() as f64 / input_size as f64,
threads_used: 1,
};
return Ok((decompressed, stats));
}
let header_size = 8 + (chunk_count * 8);
if data.len() < header_size {
return Err(IoError::DecompressionError(
"Truncated chunk headers".to_string(),
));
}
let mut chunk_sizes = Vec::with_capacity(chunk_count);
for i in 0..chunk_count {
let start_idx = 8 + (i * 8);
let size = u64::from_le_bytes(
data[start_idx..start_idx + 8]
.try_into()
.map_err(|_| IoError::DecompressionError("Invalid chunk size".to_string()))?,
) as usize;
chunk_sizes.push(size);
}
let mut chunks = Vec::with_capacity(chunk_count);
let mut offset = header_size;
for &size in &chunk_sizes {
if offset + size > data.len() {
return Err(IoError::DecompressionError(
"Truncated chunk data".to_string(),
));
}
chunks.push(&data[offset..offset + size]);
offset += size;
}
let processed_count = Arc::new(AtomicUsize::new(0));
let decompressed_chunks: Result<Vec<Vec<u8>>> = chunks
.into_iter()
.map(|chunk| {
let result = decompress_data(chunk, algorithm);
processed_count.fetch_add(1, Ordering::Relaxed);
result
})
.collect();
let decompressed_chunks = decompressed_chunks?;
let total_size: usize = decompressed_chunks.iter().map(|chunk| chunk.len()).sum();
let mut result = Vec::with_capacity(total_size);
for chunk in decompressed_chunks {
result.extend_from_slice(&chunk);
}
let operation_time = start_time.elapsed().as_secs_f64() * 1000.0;
let stats = ParallelCompressionStats {
chunks_processed: chunk_count,
bytes_processed: input_size,
bytes_output: result.len(),
operation_time_ms: operation_time,
throughput_bps: result.len() as f64 / (operation_time / 1000.0),
compression_ratio: result.len() as f64 / input_size as f64,
threads_used: num_threads,
};
Ok((result, stats))
}
#[allow(dead_code)]
pub fn compress_file_parallel<P: AsRef<Path>>(
input_path: P,
output_path: Option<P>,
algorithm: CompressionAlgorithm,
level: Option<u32>,
config: ParallelCompressionConfig,
) -> Result<(String, ParallelCompressionStats)> {
let mut input_data = Vec::new();
File::open(input_path.as_ref())
.map_err(|e| IoError::FileError(format!("Failed to open input file: {e}")))?
.read_to_end(&mut input_data)
.map_err(|e| IoError::FileError(format!("Failed to read input file: {e}")))?;
let (compressed_data, stats) = compress_data_parallel(&input_data, algorithm, level, config)?;
let output_path_string = match output_path {
Some(path) => path.as_ref().to_string_lossy().to_string(),
None => {
let mut path_buf = input_path.as_ref().to_path_buf();
let ext = algorithm.extension();
let file_name = path_buf
.file_name()
.ok_or_else(|| IoError::FileError("Invalid input file _path".to_string()))?
.to_string_lossy()
.to_string();
let new_file_name = format!("{file_name}.{ext}");
path_buf.set_file_name(new_file_name);
path_buf.to_string_lossy().to_string()
}
};
File::create(&output_path_string)
.map_err(|e| IoError::FileError(format!("Failed to create output file: {e}")))?
.write_all(&compressed_data)
.map_err(|e| IoError::FileError(format!("Failed to write to output file: {e}")))?;
Ok((output_path_string, stats))
}
#[allow(dead_code)]
pub fn decompress_file_parallel<P: AsRef<Path>>(
input_path: P,
output_path: Option<P>,
algorithm: Option<CompressionAlgorithm>,
config: ParallelCompressionConfig,
) -> Result<(String, ParallelCompressionStats)> {
let algorithm = match algorithm {
Some(algo) => algo,
None => {
let ext = input_path
.as_ref()
.extension()
.ok_or_else(|| {
IoError::DecompressionError("Unable to determine file extension".to_string())
})?
.to_string_lossy()
.to_string();
CompressionAlgorithm::from_extension(&ext)
.ok_or(IoError::UnsupportedCompressionAlgorithm(ext))?
}
};
let mut input_data = Vec::new();
File::open(input_path.as_ref())
.map_err(|e| IoError::FileError(format!("Failed to open input file: {e}")))?
.read_to_end(&mut input_data)
.map_err(|e| IoError::FileError(format!("Failed to read input file: {e}")))?;
let (decompressed_data, stats) = decompress_data_parallel(&input_data, algorithm, config)?;
let output_path_string = match output_path {
Some(path) => path.as_ref().to_string_lossy().to_string(),
None => {
let path_str = input_path.as_ref().to_string_lossy().to_string();
let ext = algorithm.extension();
if path_str.ends_with(&format!(".{ext}")) {
path_str[0..path_str.len() - ext.len() - 1].to_string()
} else {
format!("{path_str}.decompressed")
}
}
};
File::create(&output_path_string)
.map_err(|e| IoError::FileError(format!("Failed to create output file: {e}")))?
.write_all(&decompressed_data)
.map_err(|e| IoError::FileError(format!("Failed to write to output file: {e}")))?;
Ok((output_path_string, stats))
}
#[allow(dead_code)]
pub fn benchmark_compression_algorithms(
data: &[u8],
algorithms: &[CompressionAlgorithm],
levels: &[u32],
parallel_configs: &[ParallelCompressionConfig],
) -> Result<Vec<CompressionBenchmarkResult>> {
let mut results = Vec::new();
for &algorithm in algorithms {
for &level in levels {
let start_time = Instant::now();
let compressed = compress_data(data, algorithm, Some(level))?;
let sequential_time = start_time.elapsed().as_secs_f64() * 1000.0;
let decompressed = decompress_data(&compressed, algorithm)?;
let sequential_decomp_time =
start_time.elapsed().as_secs_f64() * 1000.0 - sequential_time;
assert_eq!(data, &decompressed, "Round-trip failed for {algorithm:?}");
for config in parallel_configs {
let (par_compressed, par_comp_stats) =
compress_data_parallel(data, algorithm, Some(level), config.clone())?;
let (par_decompressed, par_decomp_stats) =
decompress_data_parallel(&par_compressed, algorithm, config.clone())?;
assert_eq!(
data, &par_decompressed,
"Parallel round-trip failed for {algorithm:?}"
);
results.push(CompressionBenchmarkResult {
algorithm,
level,
config: config.clone(),
input_size: data.len(),
compressed_size: compressed.len(),
parallel_compressed_size: par_compressed.len(),
sequential_compression_time_ms: sequential_time,
sequential_decompression_time_ms: sequential_decomp_time,
parallel_compression_stats: par_comp_stats,
parallel_decompression_stats: par_decomp_stats,
compression_ratio: data.len() as f64 / compressed.len() as f64,
parallel_compression_ratio: data.len() as f64 / par_compressed.len() as f64,
});
}
}
}
Ok(results)
}
#[derive(Debug, Clone)]
pub struct CompressionBenchmarkResult {
pub algorithm: CompressionAlgorithm,
pub level: u32,
pub config: ParallelCompressionConfig,
pub input_size: usize,
pub compressed_size: usize,
pub parallel_compressed_size: usize,
pub sequential_compression_time_ms: f64,
pub sequential_decompression_time_ms: f64,
pub parallel_compression_stats: ParallelCompressionStats,
pub parallel_decompression_stats: ParallelCompressionStats,
pub compression_ratio: f64,
pub parallel_compression_ratio: f64,
}
impl CompressionBenchmarkResult {
pub fn compression_speedup(&self) -> f64 {
self.sequential_compression_time_ms / self.parallel_compression_stats.operation_time_ms
}
pub fn decompression_speedup(&self) -> f64 {
self.sequential_decompression_time_ms / self.parallel_decompression_stats.operation_time_ms
}
pub fn compression_overhead(&self) -> f64 {
self.parallel_compressed_size as f64 / self.compressed_size as f64
}
}