use crate::gzip::gzip_compress;
use oxiarc_core::error::Result;
use rayon::prelude::*;
pub const DEFAULT_CHUNK_SIZE: usize = 512 * 1024;
pub const DEFAULT_PARALLEL_CHUNK_SIZE: usize = 1024 * 1024;
const MIN_CHUNK_SIZE: usize = 65536;
pub fn compress_gzip_parallel(input: &[u8], level: u8) -> Result<Vec<u8>> {
if input.is_empty() {
return gzip_compress(input, level);
}
let chunks: Vec<&[u8]> = input.chunks(DEFAULT_CHUNK_SIZE).collect();
let members: Vec<Vec<u8>> = chunks
.par_iter()
.map(|chunk| gzip_compress(chunk, level))
.collect::<Result<Vec<_>>>()?;
let total_len: usize = members.iter().map(|m| m.len()).sum();
let mut output = Vec::with_capacity(total_len);
members
.into_iter()
.for_each(|m| output.extend_from_slice(&m));
Ok(output)
}
pub fn compress_deflate_parallel(input: &[u8], level: u8) -> Result<Vec<u8>> {
use crate::deflate::deflate;
if input.is_empty() {
return deflate(input, level);
}
let chunks: Vec<&[u8]> = input.chunks(DEFAULT_CHUNK_SIZE).collect();
let streams: Vec<Vec<u8>> = chunks
.par_iter()
.map(|chunk| deflate(chunk, level))
.collect::<Result<Vec<_>>>()?;
let total_len: usize = streams.iter().map(|s| s.len()).sum();
let mut output = Vec::with_capacity(total_len);
streams
.into_iter()
.for_each(|s| output.extend_from_slice(&s));
Ok(output)
}
pub fn gzip_compress_parallel(input: &[u8], level: u32, chunk_size: usize) -> Result<Vec<u8>> {
let chunk_size = chunk_size.max(MIN_CHUNK_SIZE);
let level_u8 = (level.min(9)) as u8;
if input.is_empty() {
return gzip_compress(input, level_u8);
}
let chunks: Vec<&[u8]> = input.chunks(chunk_size).collect();
let members: Vec<Vec<u8>> = chunks
.par_iter()
.map(|chunk| gzip_compress(chunk, level_u8))
.collect::<Result<Vec<_>>>()?;
let total_len: usize = members.iter().map(|m| m.len()).sum();
let mut output = Vec::with_capacity(total_len);
members
.into_iter()
.for_each(|m| output.extend_from_slice(&m));
Ok(output)
}
#[derive(Debug, Clone)]
pub struct ParallelGzipEncoder {
pub level: u32,
pub chunk_size: usize,
pub num_threads: Option<usize>,
}
impl ParallelGzipEncoder {
#[must_use]
pub fn new() -> Self {
Self {
level: 6,
chunk_size: DEFAULT_PARALLEL_CHUNK_SIZE,
num_threads: None,
}
}
#[must_use]
pub fn level(mut self, level: u32) -> Self {
self.level = level;
self
}
#[must_use]
pub fn chunk_size(mut self, chunk_size: usize) -> Self {
self.chunk_size = chunk_size;
self
}
#[must_use]
pub fn num_threads(mut self, n: usize) -> Self {
self.num_threads = Some(n);
self
}
pub fn encode(&self, input: &[u8]) -> Result<Vec<u8>> {
gzip_compress_parallel(input, self.level, self.chunk_size)
}
}
impl Default for ParallelGzipEncoder {
fn default() -> Self {
Self::new()
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::gzip::gzip_decompress;
fn decompress_multi_member(data: &[u8]) -> Vec<u8> {
let mut result = Vec::new();
let mut pos = 0usize;
while pos < data.len() {
let member_start = pos;
let mut member_end = data.len();
const GZIP_MIN_SIZE: usize = 18;
if pos + GZIP_MIN_SIZE < data.len() {
let search_from = pos + GZIP_MIN_SIZE;
for i in search_from..data.len().saturating_sub(1) {
if data[i] == 0x1f && data[i + 1] == 0x8b {
member_end = i;
break;
}
}
}
let member_bytes = &data[member_start..member_end];
let chunk = gzip_decompress(member_bytes)
.unwrap_or_else(|e| panic!("decompress member at {} failed: {}", pos, e));
result.extend_from_slice(&chunk);
pos = member_end;
}
result
}
#[test]
fn test_parallel_gzip_roundtrip_basic() {
let original: Vec<u8> = (0u8..=255).cycle().take(1000).collect();
let compressed = compress_gzip_parallel(&original, 6).expect("compress failed");
let decompressed = decompress_multi_member(&compressed);
assert_eq!(decompressed, original, "basic roundtrip failed");
}
#[test]
fn test_parallel_gzip_roundtrip_large() {
let original: Vec<u8> = (0u8..=255).cycle().take(5_000_000).collect();
let compressed = compress_gzip_parallel(&original, 6).expect("compress failed");
let decompressed = decompress_multi_member(&compressed);
assert_eq!(
decompressed.len(),
original.len(),
"large roundtrip length mismatch"
);
assert_eq!(decompressed, original, "large roundtrip data mismatch");
}
#[test]
fn test_parallel_gzip_roundtrip_pattern() {
let pattern = b"abcdef";
let original: Vec<u8> = pattern.iter().copied().cycle().take(1_000_000).collect();
let compressed = compress_gzip_parallel(&original, 6).expect("compress failed");
let decompressed = decompress_multi_member(&compressed);
assert_eq!(decompressed, original, "pattern roundtrip failed");
}
#[test]
fn test_parallel_gzip_vs_serial_decompresses_identically() {
use crate::gzip::gzip_compress;
let original: Vec<u8> = (0u8..=127).cycle().take(200_000).collect();
let serial = gzip_compress(&original, 6).expect("serial compress failed");
let parallel = compress_gzip_parallel(&original, 6).expect("parallel compress failed");
let serial_dec = gzip_decompress(&serial).expect("serial decompress failed");
let parallel_dec = decompress_multi_member(¶llel);
assert_eq!(
serial_dec, original,
"serial decompressed data does not match original"
);
assert_eq!(
parallel_dec, original,
"parallel decompressed data does not match original"
);
}
#[test]
fn test_parallel_gzip_empty() {
let compressed = compress_gzip_parallel(&[], 6).expect("empty compress failed");
assert!(
compressed.len() >= 2,
"empty output too short: {} bytes",
compressed.len()
);
assert_eq!(compressed[0], 0x1f, "missing GZIP ID1");
assert_eq!(compressed[1], 0x8b, "missing GZIP ID2");
let decompressed = gzip_decompress(&compressed).expect("empty decompress failed");
assert!(
decompressed.is_empty(),
"empty input should decompress to empty"
);
}
#[test]
fn test_parallel_gzip_single_chunk() {
let original: Vec<u8> = (0u8..100).collect();
let compressed =
compress_gzip_parallel(&original, 6).expect("single chunk compress failed");
let decompressed = gzip_decompress(&compressed).expect("single chunk decompress failed");
assert_eq!(decompressed, original, "single chunk roundtrip failed");
}
#[test]
fn test_parallel_gzip_multi_chunk_boundary() {
for size in [
DEFAULT_CHUNK_SIZE - 1,
DEFAULT_CHUNK_SIZE,
DEFAULT_CHUNK_SIZE + 1,
] {
let original: Vec<u8> = (0u8..=255).cycle().take(size).collect();
let compressed =
compress_gzip_parallel(&original, 6).expect("boundary compress failed");
let decompressed = decompress_multi_member(&compressed);
assert_eq!(
decompressed, original,
"boundary roundtrip failed for size={}",
size
);
}
}
#[test]
fn test_parallel_gzip_all_levels() {
let original: Vec<u8> = (0u8..=255).cycle().take(10_000).collect();
for level in [1u8, 5, 9] {
let compressed =
compress_gzip_parallel(&original, level).expect("all-levels compress failed");
let decompressed = decompress_multi_member(&compressed);
assert_eq!(
decompressed, original,
"all-levels roundtrip failed at level={}",
level
);
}
}
#[test]
fn test_gzip_compress_parallel_roundtrip_levels() {
let original: Vec<u8> = (0u8..=255).cycle().take(100_000).collect();
for level in [1u32, 5, 9] {
let compressed = gzip_compress_parallel(&original, level, DEFAULT_PARALLEL_CHUNK_SIZE)
.unwrap_or_else(|e| panic!("compress failed at level {}: {}", level, e));
let decompressed = gzip_decompress(&compressed)
.unwrap_or_else(|e| panic!("decompress failed at level {}: {}", level, e));
assert_eq!(
decompressed, original,
"roundtrip failed at level {}",
level
);
}
}
#[test]
fn test_gzip_compress_parallel_empty_input() {
let compressed = gzip_compress_parallel(&[], 6, DEFAULT_PARALLEL_CHUNK_SIZE)
.expect("empty compress failed");
assert!(
compressed.len() >= 2,
"empty output too short: {} bytes",
compressed.len()
);
assert_eq!(compressed[0], 0x1f, "missing GZIP ID1");
assert_eq!(compressed[1], 0x8b, "missing GZIP ID2");
let decompressed = gzip_decompress(&compressed).expect("empty decompress failed");
assert!(
decompressed.is_empty(),
"empty input should decompress to empty output"
);
}
#[test]
fn test_gzip_compress_parallel_sub_chunk() {
let original: Vec<u8> = b"small sub-chunk data".to_vec();
let compressed = gzip_compress_parallel(&original, 6, DEFAULT_PARALLEL_CHUNK_SIZE)
.expect("sub-chunk compress failed");
let decompressed = gzip_decompress(&compressed).expect("sub-chunk decompress failed");
assert_eq!(decompressed, original, "sub-chunk roundtrip failed");
}
#[test]
fn test_gzip_compress_parallel_multi_chunk() {
let chunk_size = DEFAULT_PARALLEL_CHUNK_SIZE;
let original: Vec<u8> = (0u8..=255).cycle().take(chunk_size * 3 + 12345).collect();
let compressed =
gzip_compress_parallel(&original, 6, chunk_size).expect("multi-chunk compress failed");
let decompressed = decompress_multi_member(&compressed);
assert_eq!(decompressed, original, "multi-chunk roundtrip failed");
}
#[test]
fn test_gzip_compress_parallel_determinism() {
let original: Vec<u8> = (0u8..=127).cycle().take(500_000).collect();
let first = gzip_compress_parallel(&original, 6, DEFAULT_PARALLEL_CHUNK_SIZE)
.expect("first compress failed");
let second = gzip_compress_parallel(&original, 6, DEFAULT_PARALLEL_CHUNK_SIZE)
.expect("second compress failed");
assert_eq!(first, second, "parallel compression is not deterministic");
}
#[test]
fn test_gzip_compress_parallel_one_byte() {
let original = vec![0xABu8];
let compressed = gzip_compress_parallel(&original, 6, DEFAULT_PARALLEL_CHUNK_SIZE)
.expect("1-byte compress failed");
let decompressed = gzip_decompress(&compressed).expect("1-byte decompress failed");
assert_eq!(decompressed, original, "1-byte roundtrip failed");
}
#[test]
fn test_parallel_gzip_encoder_builder_roundtrip() {
let original: Vec<u8> = (0u8..=255).cycle().take(200_000).collect();
let encoder = ParallelGzipEncoder::new();
let compressed = encoder.encode(&original).expect("encoder compress failed");
let decompressed = decompress_multi_member(&compressed);
assert_eq!(decompressed, original, "builder default roundtrip failed");
}
#[test]
fn test_parallel_gzip_encoder_builder_custom_level_and_chunk() {
let original: Vec<u8> = (0u8..=255).cycle().take(300_000).collect();
let encoder = ParallelGzipEncoder::new().level(9).chunk_size(100_000);
let compressed = encoder.encode(&original).expect("encoder compress failed");
let decompressed = decompress_multi_member(&compressed);
assert_eq!(
decompressed, original,
"builder custom config roundtrip failed"
);
}
#[test]
fn test_parallel_gzip_encoder_builder_default_impl() {
let encoder_a = ParallelGzipEncoder::new();
let encoder_b = ParallelGzipEncoder::default();
assert_eq!(encoder_a.level, encoder_b.level);
assert_eq!(encoder_a.chunk_size, encoder_b.chunk_size);
assert!(encoder_b.num_threads.is_none());
}
#[test]
fn test_parallel_gzip_encoder_num_threads_stored() {
let encoder = ParallelGzipEncoder::new().num_threads(4);
assert_eq!(encoder.num_threads, Some(4));
}
#[test]
fn test_gzip_compress_parallel_chunk_size_clamped() {
let original: Vec<u8> = (0u8..=255).cycle().take(200_000).collect();
let compressed =
gzip_compress_parallel(&original, 6, 1).expect("clamped chunk_size compress failed");
let decompressed = decompress_multi_member(&compressed);
assert_eq!(
decompressed, original,
"clamped chunk_size roundtrip failed"
);
}
}