#![allow(dead_code)]
use std::io::{self, Write};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Mutex;
use crate::decompress::parallel::block_finder::BlockFinder;
const WINDOW_SIZE: usize = 32768;
const MIN_PARALLEL_SIZE: usize = 4 * 1024 * 1024;
const MIN_THREADS_FOR_PARALLEL: usize = 2;
const SEARCH_RADIUS: usize = 512 * 1024;
#[inline]
fn debug_enabled() -> bool {
use std::sync::OnceLock;
static DEBUG: OnceLock<bool> = OnceLock::new();
*DEBUG.get_or_init(|| std::env::var("GZIPPY_DEBUG").is_ok())
}
pub fn decompress_parallel<W: Write>(
gzip_data: &[u8],
writer: &mut W,
num_threads: usize,
) -> Result<u64, ParallelError> {
let t0 = std::time::Instant::now();
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(gzip_data)
.map_err(|_| ParallelError::InvalidHeader)?;
let trailer_size = 8;
if gzip_data.len() < header_size + trailer_size {
return Err(ParallelError::TooSmall);
}
let deflate_data = &gzip_data[header_size..gzip_data.len() - trailer_size];
if deflate_data.len() < MIN_PARALLEL_SIZE || num_threads < MIN_THREADS_FOR_PARALLEL {
return Err(ParallelError::TooSmall);
}
let isize_offset = gzip_data.len() - 4;
let expected_output = u32::from_le_bytes([
gzip_data[isize_offset],
gzip_data[isize_offset + 1],
gzip_data[isize_offset + 2],
gzip_data[isize_offset + 3],
]) as usize;
let crc_offset = gzip_data.len() - 8;
let expected_crc = u32::from_le_bytes([
gzip_data[crc_offset],
gzip_data[crc_offset + 1],
gzip_data[crc_offset + 2],
gzip_data[crc_offset + 3],
]);
let num_chunks = num_threads;
let total_bits = deflate_data.len() * 8;
let spacing_bits = total_bits / num_chunks;
if debug_enabled() {
eprintln!(
"[parallel_sm] speculation: {} bytes deflate, {} chunks, spacing={}KB, isize={}",
deflate_data.len(),
num_chunks,
spacing_bits / 8 / 1024,
expected_output
);
}
let t_spec = std::time::Instant::now();
let speculative =
speculative_decode_parallel(deflate_data, num_chunks, spacing_bits, expected_output);
let spec_elapsed = t_spec.elapsed();
let hits = speculative.iter().filter(|s| s.is_some()).count();
if debug_enabled() {
eprintln!(
"[parallel_sm] speculative decode: {}/{} chunks found in {:.1}ms",
hits,
num_chunks,
spec_elapsed.as_secs_f64() * 1000.0
);
for (i, spec) in speculative.iter().enumerate() {
if let Some(s) = spec {
eprintln!(
" chunk {}: start_bit={} end_bit={} compressed_bits={}",
i,
s.start_bit,
s.end_bit,
s.end_bit - s.start_bit,
);
} else {
eprintln!(" chunk {}: no boundary found", i);
}
}
}
let t_confirm = std::time::Instant::now();
let total_output = confirm_resolve_write(
deflate_data,
&speculative,
expected_output,
expected_crc,
writer,
)?;
let confirm_elapsed = t_confirm.elapsed();
if debug_enabled() {
let total_elapsed = t0.elapsed();
let total_mbps = total_output as f64 / total_elapsed.as_secs_f64() / 1e6;
eprintln!(
"[parallel_sm] spec={:.1}ms confirm={:.1}ms total={:.1}ms ({:.0} MB/s)",
spec_elapsed.as_secs_f64() * 1000.0,
confirm_elapsed.as_secs_f64() * 1000.0,
total_elapsed.as_secs_f64() * 1000.0,
total_mbps
);
}
Ok(total_output as u64)
}
struct SpeculativeChunk {
start_bit: usize,
end_bit: usize,
}
fn max_output_for_chunk(isize_total: usize, num_chunks: usize, compressed_bytes: usize) -> usize {
let isize_based = if num_chunks > 0 && isize_total > 0 {
(isize_total / num_chunks) * 2
} else {
0
};
let ratio_based = (compressed_bytes * 8).max(64 * 1024);
ratio_based.max(isize_based)
}
fn speculative_decode_parallel(
deflate_data: &[u8],
num_chunks: usize,
spacing_bits: usize,
isize_total: usize,
) -> Vec<Option<SpeculativeChunk>> {
let results: Vec<Mutex<Option<SpeculativeChunk>>> =
(0..num_chunks).map(|_| Mutex::new(None)).collect();
let task_idx = AtomicUsize::new(0);
let compressed_bytes = deflate_data.len() / num_chunks;
let max_output = max_output_for_chunk(isize_total, num_chunks, compressed_bytes);
std::thread::scope(|s| {
for _ in 0..num_chunks {
s.spawn(|| loop {
let idx = task_idx.fetch_add(1, Ordering::Relaxed);
if idx >= num_chunks {
break;
}
let partition_bit = idx * spacing_bits;
let chunk = if idx == 0 {
find_chunk_end(deflate_data, 0, max_output).map(|end_bit| SpeculativeChunk {
start_bit: 0,
end_bit,
})
} else {
search_and_find(deflate_data, partition_bit, max_output)
};
if let Some(chunk) = chunk {
*results[idx].lock().unwrap() = Some(chunk);
}
});
}
});
results
.into_iter()
.map(|m| m.into_inner().unwrap())
.collect()
}
fn find_chunk_end(deflate_data: &[u8], start_bit: usize, max_output: usize) -> Option<usize> {
let (_, end_bit) = crate::backends::inflate_bit::decompress_deflate_from_bit_with_end(
deflate_data,
start_bit,
&[],
max_output,
)?;
Some(end_bit)
}
fn search_and_find(
deflate_data: &[u8],
from_bit: usize,
max_output: usize,
) -> Option<SpeculativeChunk> {
let start_bit = search_boundary_forward(deflate_data, from_bit)?;
let end_bit = find_chunk_end(deflate_data, start_bit, max_output)?;
Some(SpeculativeChunk { start_bit, end_bit })
}
fn search_boundary_forward(deflate_data: &[u8], from_bit: usize) -> Option<usize> {
let search_end = (from_bit + SEARCH_RADIUS * 8).min(deflate_data.len() * 8);
if from_bit >= search_end {
return None;
}
let finder = BlockFinder::new(deflate_data);
let sub_chunk_bits = 8 * 1024 * 8;
let mut chunk_start = from_bit;
while chunk_start < search_end {
let chunk_end = (chunk_start + sub_chunk_bits).min(search_end);
let mut candidates = finder.find_blocks(chunk_start, chunk_end);
candidates.sort_by_key(|b| b.bit_offset);
for candidate in &candidates {
if try_decode_at(deflate_data, candidate.bit_offset) {
return Some(candidate.bit_offset);
}
}
chunk_start = chunk_end;
}
let brute_end = (from_bit + 128 * 1024 * 8).min(search_end);
(from_bit..brute_end)
.step_by(8)
.find(|&bit| try_decode_at(deflate_data, bit))
}
fn try_decode_at(deflate_data: &[u8], bit_offset: usize) -> bool {
let start_byte = bit_offset / 8;
if start_byte >= deflate_data.len() {
return false;
}
let remaining = deflate_data.len() - start_byte;
let min_output = if remaining > 128 * 1024 {
32 * 1024
} else {
4 * 1024
};
crate::backends::inflate_bit::decompress_deflate_from_bit(
deflate_data,
bit_offset,
&[],
min_output,
)
.is_some_and(|out| out.len() >= min_output)
}
fn confirm_resolve_write<W: Write>(
deflate_data: &[u8],
speculative: &[Option<SpeculativeChunk>],
expected_size: usize,
expected_crc: u32,
writer: &mut W,
) -> Result<usize, ParallelError> {
let mut buffer = Vec::with_capacity(expected_size);
let mut window = Vec::<u8>::new();
let mut confirmed_bit: usize = 0;
let total_bits = deflate_data.len() * 8;
let mut spec_by_start: std::collections::HashMap<usize, usize> =
std::collections::HashMap::new();
for (i, spec) in speculative.iter().enumerate() {
if let Some(chunk) = spec {
spec_by_start.insert(chunk.start_bit, i);
}
}
loop {
if expected_size > 0 && buffer.len() >= expected_size {
break;
}
if confirmed_bit >= total_bits {
break;
}
if let Some(&idx) = spec_by_start.get(&confirmed_bit) {
let chunk = speculative[idx].as_ref().unwrap();
if debug_enabled() {
eprintln!(
"[parallel_sm] confirm: HIT at bit {} (chunk {}), compressed_bits={}",
confirmed_bit,
idx,
chunk.end_bit - chunk.start_bit,
);
}
let chunk_end_byte = chunk.end_bit.div_ceil(8).min(deflate_data.len());
let chunk_input = &deflate_data[..chunk_end_byte];
match crate::backends::inflate_bit::decompress_deflate_from_bit_with_end(
chunk_input,
chunk.start_bit,
&window,
expected_size.saturating_sub(buffer.len()),
) {
Some((decoded, actual_end_bit)) => {
update_window(&mut window, &decoded);
buffer.extend_from_slice(&decoded);
confirmed_bit = snap_to_nearest_spec(actual_end_bit, &spec_by_start);
}
None => return Err(ParallelError::DecodeFailed),
}
} else {
let (bytes, end_bit) = decode_sequential_to_spec(
deflate_data,
confirmed_bit,
&window,
expected_size.saturating_sub(buffer.len()),
&spec_by_start,
)?;
if debug_enabled() && end_bit != confirmed_bit {
let found_spec = spec_by_start.contains_key(&end_bit);
eprintln!(
"[parallel_sm] confirm: sequential {} → {} ({} bytes){}",
confirmed_bit,
end_bit,
bytes.len(),
if found_spec { " → spec match!" } else { "" }
);
}
if end_bit == confirmed_bit {
break;
}
buffer.extend_from_slice(&bytes);
update_window(&mut window, &bytes);
confirmed_bit = end_bit;
}
}
verify_output(&buffer, expected_size, expected_crc)?;
writer.write_all(&buffer)?;
Ok(buffer.len())
}
fn decode_sequential_to_spec(
deflate_data: &[u8],
start_bit: usize,
window: &[u8],
max_output: usize,
_spec_by_start: &std::collections::HashMap<usize, usize>,
) -> Result<(Vec<u8>, usize), ParallelError> {
let start_byte = start_bit / 8;
if start_byte >= deflate_data.len() {
return Ok((Vec::new(), start_bit));
}
if crate::backends::inflate_bit::is_available() {
match crate::backends::inflate_bit::decompress_deflate_from_bit_with_end(
deflate_data,
start_bit,
window,
max_output,
) {
Some((decoded, end_bit)) => {
if debug_enabled() {
eprintln!(
"[parallel_sm] sequential fast: {} → {} ({} bytes)",
start_bit,
end_bit,
decoded.len()
);
}
return Ok((decoded, end_bit));
}
None => return Err(ParallelError::DecodeFailed),
}
}
Err(ParallelError::DecodeFailed)
}
fn snap_to_nearest_spec(
bit: usize,
spec_by_start: &std::collections::HashMap<usize, usize>,
) -> usize {
for delta in 1usize..=7 {
if bit >= delta {
let candidate = bit - delta;
if spec_by_start.contains_key(&candidate) {
return candidate;
}
}
}
bit
}
fn verify_output(
buffer: &[u8],
expected_size: usize,
expected_crc: u32,
) -> Result<(), ParallelError> {
if expected_size > 0 && buffer.len() != expected_size {
if debug_enabled() {
eprintln!(
"[parallel_sm] output size mismatch: got {} expected {}",
buffer.len(),
expected_size
);
}
return Err(ParallelError::SizeMismatch);
}
if expected_crc != 0 {
let mut hasher = crc32fast::Hasher::new();
hasher.update(buffer);
let actual_crc = hasher.finalize();
if actual_crc != expected_crc {
if debug_enabled() {
eprintln!(
"[parallel_sm] CRC32 mismatch: got {:#010x} expected {:#010x}",
actual_crc, expected_crc
);
}
return Err(ParallelError::CrcMismatch);
}
}
Ok(())
}
#[cfg(test)]
fn find_next_spec_start(
spec_by_start: &std::collections::HashMap<usize, usize>,
after_bit: usize,
total_bits: usize,
) -> usize {
let mut best = total_bits;
for &start in spec_by_start.keys() {
if start > after_bit && start < best {
best = start;
}
}
best
}
#[cfg(test)]
fn decode_sequential_from(
deflate_data: &[u8],
start_bit: usize,
until_bit: Option<usize>,
window: &[u8],
max_output: usize,
) -> Result<(Vec<u8>, usize), ParallelError> {
let end = until_bit
.map(|b| b.div_ceil(8).min(deflate_data.len()))
.unwrap_or(deflate_data.len());
let data = &deflate_data[..end];
crate::backends::inflate_bit::decompress_deflate_from_bit_with_end(
data, start_bit, window, max_output,
)
.ok_or(ParallelError::DecodeFailed)
}
fn update_window(window: &mut Vec<u8>, new_data: &[u8]) {
if new_data.len() >= WINDOW_SIZE {
*window = new_data[new_data.len() - WINDOW_SIZE..].to_vec();
} else if window.len() + new_data.len() <= WINDOW_SIZE {
window.extend_from_slice(new_data);
} else {
let keep = WINDOW_SIZE - new_data.len();
let start = window.len() - keep;
let kept: Vec<u8> = window[start..].to_vec();
*window = kept;
window.extend_from_slice(new_data);
}
}
#[derive(Debug)]
pub enum ParallelError {
InvalidHeader,
TooSmall,
DecodeFailed,
SizeMismatch,
CrcMismatch,
Io(io::Error),
}
impl From<io::Error> for ParallelError {
fn from(e: io::Error) -> Self {
ParallelError::Io(e)
}
}
impl ParallelError {
pub fn is_routing(&self) -> bool {
matches!(self, ParallelError::TooSmall)
}
}
impl std::fmt::Display for ParallelError {
fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
match self {
ParallelError::InvalidHeader => write!(f, "invalid gzip header"),
ParallelError::TooSmall => write!(f, "file too small for parallel decode"),
ParallelError::DecodeFailed => write!(f, "chunk decode failed"),
ParallelError::SizeMismatch => write!(f, "output size mismatch"),
ParallelError::CrcMismatch => write!(f, "CRC32 mismatch"),
ParallelError::Io(e) => write!(f, "I/O error: {}", e),
}
}
}
#[cfg(test)]
mod tests {
use super::*;
use crate::decompress::parallel::marker_decode::{replace_markers, MarkerDecoder};
struct DecodedChunk {
start_bit: usize,
end_bit: usize,
}
fn max_output_for_chunk_compat(compressed_bytes: usize) -> usize {
max_output_for_chunk(0, 0, compressed_bytes)
}
fn find_chunk_boundaries(deflate: &[u8], num_chunks: usize) -> Vec<usize> {
let total_bits = deflate.len() * 8;
let spacing = total_bits / num_chunks;
let mut boundaries = vec![0];
for i in 1..num_chunks {
let partition_bit = i * spacing;
if let Some(boundary) = search_boundary_forward(deflate, partition_bit) {
boundaries.push(boundary);
}
}
boundaries
}
fn decode_chunks_parallel(
deflate: &[u8],
boundaries: &[usize],
_num_threads: usize,
) -> Result<Vec<DecodedChunk>, ParallelError> {
let isize_est = deflate.len() * 4;
let mut chunks = Vec::new();
for i in 0..boundaries.len() {
let start = boundaries[i];
let until = boundaries.get(i + 1).copied();
let max = max_output_for_chunk(
isize_est,
boundaries.len(),
deflate.len() / boundaries.len(),
);
let end_bit = find_chunk_end(
if let Some(u) = until {
&deflate[..u.div_ceil(8).min(deflate.len())]
} else {
deflate
},
start,
max,
)
.ok_or(ParallelError::DecodeFailed)?;
chunks.push(DecodedChunk {
start_bit: start,
end_bit,
});
}
Ok(chunks)
}
fn resolve_and_write<W: Write>(
deflate: &[u8],
chunks: &[DecodedChunk],
writer: &mut W,
expected_size: usize,
expected_crc: u32,
) -> Result<usize, ParallelError> {
let mut buffer = Vec::with_capacity(expected_size);
let mut window = Vec::new();
for chunk in chunks {
let chunk_end_byte = chunk.end_bit.div_ceil(8).min(deflate.len());
let chunk_input = &deflate[..chunk_end_byte];
let (decoded, _) = crate::backends::inflate_bit::decompress_deflate_from_bit_with_end(
chunk_input,
chunk.start_bit,
&window,
expected_size,
)
.ok_or(ParallelError::DecodeFailed)?;
update_window(&mut window, &decoded);
buffer.extend_from_slice(&decoded);
}
verify_output(&buffer, expected_size, expected_crc)?;
writer.write_all(&buffer)?;
Ok(buffer.len())
}
fn make_gzip_data(data: &[u8]) -> Vec<u8> {
use std::io::Write;
let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
encoder.write_all(data).unwrap();
encoder.finish().unwrap()
}
fn make_compressible_data(size: usize) -> Vec<u8> {
let mut data = Vec::with_capacity(size);
let mut rng: u64 = 0xdeadbeef;
while data.len() < size {
rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
if (rng >> 32) % 5 < 3 {
data.push((rng >> 16) as u8);
} else {
let byte = ((rng >> 24) % 26 + b'a' as u64) as u8;
let repeat = ((rng >> 40) % 8 + 2) as usize;
for _ in 0..repeat.min(size - data.len()) {
data.push(byte);
}
}
}
data.truncate(size);
data
}
fn get_deflate_and_expected(gzip_data: &[u8]) -> (&[u8], Vec<u8>) {
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(gzip_data)
.expect("valid header");
let deflate = &gzip_data[header_size..gzip_data.len() - 8];
let mut expected = vec![0u8; deflate.len() * 10 + 65536];
let size = crate::decompress::inflate::consume_first_decode::inflate_consume_first(
deflate,
&mut expected,
)
.expect("reference inflate");
expected.truncate(size);
(deflate, expected)
}
#[test]
fn test_parallel_small_falls_back() {
let data = b"hello world";
let compressed = make_gzip_data(data);
let mut output = Vec::new();
let result = decompress_parallel(&compressed, &mut output, 4);
assert!(matches!(result, Err(ParallelError::TooSmall)));
}
#[test]
fn test_try_decode_rejects_random_positions() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let mut rng: u64 = 12345;
let mut accepted = 0;
let mut rejected = 0;
let total_bits = deflate.len() * 8;
for _ in 0..100 {
rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
let bit = (rng as usize) % total_bits;
if try_decode_at(deflate, bit) {
accepted += 1;
} else {
rejected += 1;
}
}
eprintln!(
"random positions: {}/{} accepted ({:.1}%)",
accepted,
accepted + rejected,
accepted as f64 / (accepted + rejected) as f64 * 100.0
);
assert!(
accepted < 20,
"try_decode accepted {} of 100 random positions — too permissive",
accepted
);
}
#[test]
fn test_try_decode_accepts_oracle_boundaries() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let scan = crate::decompress::scan_inflate::scan_deflate_fast(deflate, 512 * 1024, 0)
.expect("scan");
let total_bits = deflate.len() * 8;
let cutoff = total_bits * 9 / 10; let mut accepted = 0;
let mut total = 0;
for cp in &scan.checkpoints {
let real_bitsleft = (cp.bitsleft as u8) as usize;
let bit_pos = cp.input_byte_pos * 8 - real_bitsleft;
if bit_pos >= cutoff {
continue;
}
total += 1;
if try_decode_at(deflate, bit_pos) {
accepted += 1;
} else {
eprintln!(
"REJECTED known boundary at bit {} ({:.1}% into stream)",
bit_pos,
bit_pos as f64 / total_bits as f64 * 100.0
);
}
}
eprintln!(
"oracle boundaries (excluding last 10%): {}/{} accepted ({:.1}%)",
accepted,
total,
if total > 0 {
accepted as f64 / total as f64 * 100.0
} else {
0.0
}
);
assert_eq!(
accepted,
total,
"try_decode rejected {} of {} oracle boundaries",
total - accepted,
total
);
}
#[test]
fn test_block_finder_with_try_decode_finds_boundaries() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let scan = crate::decompress::scan_inflate::scan_deflate_fast(deflate, 512 * 1024, 0)
.expect("scan");
let finder = BlockFinder::new(deflate);
let mut found_real = 0;
for cp in &scan.checkpoints {
let real_bitsleft = (cp.bitsleft as u8) as usize;
let real_bit = cp.input_byte_pos * 8 - real_bitsleft;
let search_start = real_bit.saturating_sub(SEARCH_RADIUS * 8);
let search_end = (real_bit + SEARCH_RADIUS * 8).min(deflate.len() * 8);
let candidates = finder.find_blocks(search_start, search_end);
let mut hit = false;
for c in &candidates {
if c.bit_offset.abs_diff(real_bit) < 1024 && try_decode_at(deflate, c.bit_offset) {
hit = true;
break;
}
}
if hit {
found_real += 1;
}
}
eprintln!(
"block_finder + try_decode: {}/{} real boundaries found",
found_real,
scan.checkpoints.len()
);
}
#[test]
fn test_chunk_output_bounded() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 2 {
eprintln!("not enough boundaries, skipping");
return;
}
for i in 0..boundaries.len() {
let start_bit = boundaries[i];
let start_byte = start_bit / 8;
let relative_start_bit = start_bit % 8;
let is_last = i + 1 >= boundaries.len();
let end_byte = if is_last {
deflate.len()
} else {
let next_byte = boundaries[i + 1] / 8;
(next_byte + 64 * 1024).min(deflate.len())
};
let compressed_bytes = end_byte - start_byte;
let max_output = max_output_for_chunk_compat(compressed_bytes);
let data_slice = &deflate[start_byte..end_byte];
let mut decoder = MarkerDecoder::new(data_slice, relative_start_bit);
if is_last {
let _ = decoder.decode_until(max_output);
} else {
let next_boundary = boundaries[i + 1];
let relative_end_bit = next_boundary - start_byte * 8;
let _ = decoder.decode_until_bit(max_output, relative_end_bit);
}
let output_size = decoder.output().len();
eprintln!(
"chunk {}: {} compressed bytes → {} output ({:.1}x), limit={}",
i,
compressed_bytes,
output_size,
if compressed_bytes > 0 {
output_size as f64 / compressed_bytes as f64
} else {
0.0
},
max_output
);
let tolerance = max_output * 2;
assert!(
output_size <= tolerance,
"chunk {} produced {} bytes from {} compressed ({:.1}x) — exceeded tolerance {}",
i,
output_size,
compressed_bytes,
output_size as f64 / compressed_bytes as f64,
tolerance
);
}
}
#[test]
fn test_chunk0_no_markers_matches_sequential() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 2 {
eprintln!("not enough boundaries, skipping");
return;
}
let end_bit = boundaries[1];
let mut decoder = MarkerDecoder::new(deflate, 0);
decoder
.decode_until_bit(usize::MAX, end_bit)
.expect("chunk 0 decode");
assert_eq!(
decoder.marker_count(),
0,
"chunk 0 should have no markers (starts from beginning)"
);
let mut ref_output = vec![0u8; data.len() + 65536];
crate::decompress::inflate::consume_first_decode::inflate_consume_first(
deflate,
&mut ref_output,
)
.expect("reference inflate");
let chunk0_bytes: Vec<u8> = decoder.output().iter().map(|&v| v as u8).collect();
let cmp_len = chunk0_bytes.len().min(ref_output.len());
assert_eq!(
&chunk0_bytes[..cmp_len],
&ref_output[..cmp_len],
"chunk 0 output doesn't match sequential reference"
);
eprintln!(
"chunk 0: {} bytes, matches sequential reference",
chunk0_bytes.len()
);
}
#[test]
fn test_bounded_slice_not_full_tail() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 3 {
eprintln!("not enough boundaries, skipping");
return;
}
let start_bit = boundaries[1];
let start_byte = start_bit / 8;
let next_bit = boundaries[2];
let end_byte = (next_bit / 8 + 64 * 1024).min(deflate.len());
let compressed_bytes = end_byte - start_byte;
let max_out = max_output_for_chunk_compat(compressed_bytes);
let relative_start = start_bit % 8;
let bounded_slice = &deflate[start_byte..end_byte];
let mut bounded_decoder = MarkerDecoder::new(bounded_slice, relative_start);
let _ = bounded_decoder.decode_until(max_out);
let bounded_output = bounded_decoder.output().len();
let full_tail = &deflate[start_byte..];
let mut tail_decoder = MarkerDecoder::new(full_tail, relative_start);
let _ = tail_decoder.decode_until(max_out);
let tail_output = tail_decoder.output().len();
eprintln!(
"bounded slice: {} output, full tail: {} output, max_out: {}",
bounded_output, tail_output, max_out
);
let tolerance = max_out * 2;
assert!(
bounded_output <= tolerance,
"bounded decode wildly exceeded max: {} > {} (2x limit)",
bounded_output,
tolerance
);
if tail_output > bounded_output * 3 && tail_output > 1024 * 1024 {
eprintln!(
"WARNING: full-tail produced {}x more than bounded ({} vs {}) — \
possible &data[start..] regression",
tail_output / bounded_output.max(1),
tail_output,
bounded_output
);
}
}
#[test]
fn test_e2e_roundtrip_strict() {
let data = make_compressible_data(40 * 1024 * 1024);
let compressed = make_gzip_data(&data);
eprintln!(
"e2e strict: {} bytes → {} bytes ({:.1}%)",
data.len(),
compressed.len(),
compressed.len() as f64 / data.len() as f64 * 100.0
);
if compressed.len() < MIN_PARALLEL_SIZE {
eprintln!("compressed data too small, skipping");
return;
}
let mut output = Vec::new();
match decompress_parallel(&compressed, &mut output, 10) {
Ok(bytes) => {
assert_eq!(
bytes as usize,
data.len(),
"output size mismatch: got {} expected {}",
bytes,
data.len()
);
assert_eq!(output, data, "output content mismatch");
eprintln!("e2e strict: PASS ({} bytes)", bytes);
}
Err(e) => {
eprintln!("e2e strict: fell back ({})", e);
let mut seq_out = Vec::new();
let mut decoder = flate2::read::GzDecoder::new(&compressed[..]);
std::io::Read::read_to_end(&mut decoder, &mut seq_out).unwrap();
assert_eq!(seq_out, data, "sequential decode mismatch");
}
}
}
#[test]
fn test_parallel_silesia() {
let gz = match std::fs::read("benchmark_data/silesia-gzip.tar.gz") {
Ok(d) => d,
Err(_) => {
eprintln!("skipping (silesia not found)");
return;
}
};
let (_deflate, ref_output) = get_deflate_and_expected(&gz);
let ref_size = ref_output.len();
let mut par_output = Vec::new();
let t = std::time::Instant::now();
let result = decompress_parallel(&gz, &mut par_output, 10);
let elapsed = t.elapsed();
match result {
Ok(bytes) => {
assert_eq!(bytes as usize, ref_size, "size mismatch");
assert_eq!(par_output, ref_output, "content mismatch");
let mbps = ref_size as f64 / elapsed.as_secs_f64() / 1e6;
eprintln!(
"parallel silesia: {} bytes in {:.1}ms ({:.0} MB/s)",
ref_size,
elapsed.as_secs_f64() * 1000.0,
mbps
);
}
Err(e) => {
eprintln!("parallel silesia fell back: {}", e);
}
}
}
#[test]
fn test_chunk_output_sum_matches_isize() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let expected_size = data.len();
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 2 {
eprintln!("not enough boundaries, skipping");
return;
}
let chunks = match decode_chunks_parallel(deflate, &boundaries, 4) {
Ok(c) => c,
Err(e) => {
eprintln!("decode failed: {}, skipping", e);
return;
}
};
for (i, chunk) in chunks.iter().enumerate() {
assert!(
chunk.end_bit > chunk.start_bit,
"chunk {} has zero-length bit range: start={} end={}",
i,
chunk.start_bit,
chunk.end_bit
);
}
let total_compressed_bits: usize = chunks.iter().map(|c| c.end_bit - c.start_bit).sum();
eprintln!(
"chunk compressed bits: {}, deflate total bits: {}",
total_compressed_bits,
deflate.len() * 8
);
eprintln!(
"expected output: {}, chunks found: {}",
expected_size,
chunks.len()
);
}
#[test]
fn test_boundaries_sorted_and_unique() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
assert_eq!(boundaries[0], 0, "first boundary must be bit 0");
for i in 1..boundaries.len() {
assert!(
boundaries[i] > boundaries[i - 1],
"boundaries not strictly sorted: [{}]={} >= [{}]={}",
i - 1,
boundaries[i - 1],
i,
boundaries[i]
);
}
let max_bit = deflate.len() * 8;
for (i, &b) in boundaries.iter().enumerate() {
assert!(
b < max_bit,
"boundary {} at bit {} exceeds data size {} bits",
i,
b,
max_bit
);
}
eprintln!("boundaries: {:?}", boundaries);
}
fn make_all_zeros(size: usize) -> Vec<u8> {
vec![0u8; size]
}
fn make_random_data(size: usize, seed: u64) -> Vec<u8> {
let mut data = Vec::with_capacity(size);
let mut rng = seed;
while data.len() < size {
rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
data.push((rng >> 32) as u8);
}
data.truncate(size);
data
}
fn make_text_data(size: usize) -> Vec<u8> {
let phrase = b"the quick brown fox jumps over the lazy dog ";
let mut data = Vec::with_capacity(size);
while data.len() < size {
let remaining = size - data.len();
let chunk = &phrase[..remaining.min(phrase.len())];
data.extend_from_slice(chunk);
}
data
}
fn make_rle_data(size: usize) -> Vec<u8> {
let mut data = Vec::with_capacity(size);
let mut rng: u64 = 0xaabbccdd;
while data.len() < size {
rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
let byte = (rng >> 32) as u8;
let run_len = ((rng >> 48) % 200 + 10) as usize;
for _ in 0..run_len.min(size - data.len()) {
data.push(byte);
}
}
data.truncate(size);
data
}
fn make_mixed_data(size: usize) -> Vec<u8> {
let mut data = Vec::with_capacity(size);
let quarter = size / 4;
data.extend_from_slice(&make_all_zeros(quarter));
data.extend_from_slice(&make_text_data(quarter));
data.extend_from_slice(&make_rle_data(quarter));
data.extend_from_slice(&make_random_data(size - data.len(), 42));
data
}
fn make_gzip_at_level(data: &[u8], level: u32) -> Vec<u8> {
use std::io::Write;
let mut encoder =
flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::new(level));
encoder.write_all(data).unwrap();
encoder.finish().unwrap()
}
fn get_oracle_boundaries(deflate: &[u8], interval: usize) -> Vec<usize> {
let scan = crate::decompress::scan_inflate::scan_deflate_fast(deflate, interval, 0)
.expect("scan_deflate_fast");
let mut bits: Vec<usize> = vec![0];
for cp in &scan.checkpoints {
let real_bitsleft = (cp.bitsleft as u8) as usize;
let bit_pos = cp.input_byte_pos * 8 - real_bitsleft;
bits.push(bit_pos);
}
bits
}
fn gzip_crc32(gzip_data: &[u8]) -> u32 {
let offset = gzip_data.len() - 8;
u32::from_le_bytes([
gzip_data[offset],
gzip_data[offset + 1],
gzip_data[offset + 2],
gzip_data[offset + 3],
])
}
fn compute_crc32(data: &[u8]) -> u32 {
let mut hasher = crc32fast::Hasher::new();
hasher.update(data);
hasher.finalize()
}
#[test]
fn test_oracle_chain_convergence_8mb() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 1024 * 1024);
assert!(boundaries.len() >= 3, "need at least 3 oracle boundaries");
for i in 0..boundaries.len() - 1 {
let start_bit = boundaries[i];
let end_bit = boundaries[i + 1];
let start_byte = start_bit / 8;
let relative_start = start_bit % 8;
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
let data_slice = &deflate[start_byte..end_byte];
let mut decoder = MarkerDecoder::new(data_slice, relative_start);
let relative_end = end_bit - start_byte * 8;
decoder
.decode_until_bit(usize::MAX, relative_end)
.expect("decode should succeed");
let final_pos = decoder.bit_position();
assert_eq!(
final_pos,
relative_end,
"chunk {}: decode_until_bit landed at bit {} but expected {} (overshoot={})",
i,
final_pos,
relative_end,
final_pos as i64 - relative_end as i64
);
}
}
#[test]
fn test_found_boundary_chain_convergence() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 2 {
eprintln!("not enough boundaries, skipping");
return;
}
for i in 0..boundaries.len() - 1 {
let start_bit = boundaries[i];
let end_bit = boundaries[i + 1];
let start_byte = start_bit / 8;
let relative_start = start_bit % 8;
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
let data_slice = &deflate[start_byte..end_byte];
let mut decoder = MarkerDecoder::new(data_slice, relative_start);
let relative_end = end_bit - start_byte * 8;
decoder
.decode_until_bit(usize::MAX, relative_end)
.expect("decode should succeed");
let final_pos = decoder.bit_position();
assert_eq!(
final_pos,
relative_end,
"chunk {} (bit {}→{}): decoder landed at {} not {} — \
boundary {} is NOT on the same block chain (false positive!)",
i,
start_bit,
end_bit,
final_pos,
relative_end,
i + 1
);
}
}
#[test]
fn test_chunk_output_tiles_exactly_with_oracle() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 1024 * 1024);
assert!(boundaries.len() >= 3);
let mut total_output = 0usize;
for i in 0..boundaries.len() {
let start_bit = boundaries[i];
let is_last = i + 1 >= boundaries.len();
let start_byte = start_bit / 8;
let relative_start = start_bit % 8;
let mut decoder;
if is_last {
decoder = MarkerDecoder::new(&deflate[start_byte..], relative_start);
let _ = decoder.decode_until(deflate.len() * 8);
} else {
let end_bit = boundaries[i + 1];
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
decoder = MarkerDecoder::new(&deflate[start_byte..end_byte], relative_start);
let relative_end = end_bit - start_byte * 8;
decoder.decode_until_bit(usize::MAX, relative_end).unwrap();
}
total_output += decoder.output().len();
}
assert_eq!(
total_output,
data.len(),
"oracle-chunked output {} != expected {}",
total_output,
data.len()
);
}
#[test]
fn test_parallel_output_crc_matches_trailer() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let mut output = Vec::new();
match decompress_parallel(&compressed, &mut output, 4) {
Ok(_) => {
let expected_crc = gzip_crc32(&compressed);
let actual_crc = compute_crc32(&output);
assert_eq!(
actual_crc, expected_crc,
"CRC32 mismatch: output={:#010x} trailer={:#010x}",
actual_crc, expected_crc
);
}
Err(e) => {
eprintln!("parallel returned error (acceptable for now): {}", e);
}
}
}
#[test]
fn test_sequential_reference_crc_matches() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let expected_crc = gzip_crc32(&compressed);
let actual_crc = compute_crc32(&data);
assert_eq!(
actual_crc, expected_crc,
"reference data CRC doesn't match gzip trailer"
);
}
#[test]
fn test_try_decode_rejects_bit0() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
assert!(
try_decode_at(deflate, 0),
"bit 0 should be accepted (it IS the stream start)"
);
}
#[test]
fn test_try_decode_accepts_mid_stream_oracle() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 512 * 1024);
let mid_boundaries: Vec<_> = boundaries[1..boundaries.len().saturating_sub(1)].to_vec();
assert!(!mid_boundaries.is_empty(), "need mid-stream boundaries");
for &bit in &mid_boundaries {
assert!(
try_decode_at(deflate, bit),
"real mid-stream boundary at bit {} rejected",
bit
);
}
}
#[test]
fn test_try_decode_rejects_mid_block_positions() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 512 * 1024);
assert!(boundaries.len() >= 3);
let mut rejected = 0;
let mut tested = 0;
for i in 0..boundaries.len() - 1 {
let mid_bit = (boundaries[i] + boundaries[i + 1]) / 2;
if boundaries.contains(&mid_bit) {
continue;
}
tested += 1;
if !try_decode_at(deflate, mid_bit) {
rejected += 1;
}
}
let rejection_rate = if tested > 0 {
rejected as f64 / tested as f64
} else {
1.0
};
assert!(
rejection_rate > 0.8,
"mid-block positions should mostly be rejected: {}/{} rejected ({:.0}%)",
rejected,
tested,
rejection_rate * 100.0
);
}
#[test]
fn test_try_decode_marker_rate_at_oracle_boundary() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 1024 * 1024);
for &bit in &boundaries[1..] {
let start_byte = bit / 8;
let relative_start = bit % 8;
let remaining = deflate.len() - start_byte;
let slice_len = remaining.min(2 * 1024 * 1024);
let data_slice = &deflate[start_byte..start_byte + slice_len];
let mut decoder = MarkerDecoder::new(data_slice, relative_start);
let _ = decoder.decode_until(512 * 1024);
let output_len = decoder.output().len();
let marker_count = decoder.marker_count();
if output_len > 0 {
let rate = marker_count as f64 / output_len as f64;
assert!(
rate < 0.20,
"oracle boundary at bit {}: marker rate {:.1}% exceeds 20%",
bit,
rate * 100.0
);
}
}
}
#[test]
fn test_try_decode_500_random_positions() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let total_bits = deflate.len() * 8;
let mut rng: u64 = 0x1234567890;
let mut accepted = 0;
for _ in 0..500 {
rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
let bit = (rng as usize) % total_bits;
if try_decode_at(deflate, bit) {
accepted += 1;
}
}
assert!(
accepted < 10,
"try_decode accepted {}/500 random positions — too permissive",
accepted
);
}
#[test]
fn test_update_window_small_data() {
let mut window = Vec::new();
update_window(&mut window, &[1, 2, 3]);
assert_eq!(window, vec![1, 2, 3]);
}
#[test]
fn test_update_window_exactly_window_size() {
let mut window = Vec::new();
let data: Vec<u8> = (0..WINDOW_SIZE).map(|i| (i % 256) as u8).collect();
update_window(&mut window, &data);
assert_eq!(window.len(), WINDOW_SIZE);
assert_eq!(window, data);
}
#[test]
fn test_update_window_larger_than_window_size() {
let mut window = Vec::new();
let data: Vec<u8> = (0..WINDOW_SIZE + 1000).map(|i| (i % 256) as u8).collect();
update_window(&mut window, &data);
assert_eq!(window.len(), WINDOW_SIZE);
assert_eq!(window, &data[1000..]);
}
#[test]
fn test_update_window_accumulates() {
let mut window = Vec::new();
update_window(&mut window, &[1, 2, 3]);
update_window(&mut window, &[4, 5, 6]);
assert_eq!(window, vec![1, 2, 3, 4, 5, 6]);
}
#[test]
fn test_update_window_rotates_at_capacity() {
let mut window = Vec::new();
let fill: Vec<u8> = (0..WINDOW_SIZE).map(|i| (i % 256) as u8).collect();
update_window(&mut window, &fill);
assert_eq!(window.len(), WINDOW_SIZE);
update_window(&mut window, &[0xAA, 0xBB]);
assert_eq!(window.len(), WINDOW_SIZE);
assert_eq!(window[WINDOW_SIZE - 2], 0xAA);
assert_eq!(window[WINDOW_SIZE - 1], 0xBB);
assert_eq!(window[0], fill[2]);
}
#[test]
fn test_update_window_empty_data() {
let mut window = vec![1, 2, 3];
update_window(&mut window, &[]);
assert_eq!(window, vec![1, 2, 3]);
}
#[test]
fn test_replace_markers_no_markers() {
let mut data: Vec<u16> = vec![65, 66, 67]; let window = vec![0u8; WINDOW_SIZE];
replace_markers(&mut data, &window);
assert_eq!(data, vec![65, 66, 67]);
}
#[test]
fn test_replace_markers_with_markers() {
use crate::decompress::parallel::marker_decode::MARKER_BASE;
let mut window = vec![0u8; WINDOW_SIZE];
window[WINDOW_SIZE - 1] = 0xFF;
window[WINDOW_SIZE - 2] = 0xFE;
let mut data: Vec<u16> = vec![65, MARKER_BASE, MARKER_BASE + 1];
replace_markers(&mut data, &window);
assert_eq!(data[0], 65);
assert_eq!(data[1], 0xFF_u16);
assert_eq!(data[2], 0xFE_u16);
}
#[test]
fn test_thread_count_independence() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let mut results: Vec<(usize, Result<Vec<u8>, String>)> = Vec::new();
for threads in [4, 6, 8] {
let mut output = Vec::new();
match decompress_parallel(&compressed, &mut output, threads) {
Ok(_) => results.push((threads, Ok(output))),
Err(e) => results.push((threads, Err(format!("{}", e)))),
}
}
let successful: Vec<_> = results
.iter()
.filter_map(|(t, r)| r.as_ref().ok().map(|v| (*t, v)))
.collect();
for window in successful.windows(2) {
let (t1, v1) = window[0];
let (t2, v2) = window[1];
assert_eq!(
v1,
v2,
"output differs between T{} ({} bytes) and T{} ({} bytes)",
t1,
v1.len(),
t2,
v2.len()
);
}
}
fn assert_parallel_correct_or_error(label: &str, data: &[u8]) {
let compressed = make_gzip_data(data);
if compressed.len() < MIN_PARALLEL_SIZE {
return;
}
let mut output = Vec::new();
match decompress_parallel(&compressed, &mut output, 4) {
Ok(bytes) => {
assert_eq!(
bytes as usize,
data.len(),
"{}: size mismatch {} vs {}",
label,
bytes,
data.len()
);
assert_eq!(&output, data, "{}: content mismatch", label);
}
Err(_) => {
}
}
}
#[test]
fn test_parallel_compressible_data() {
let data = make_compressible_data(8 * 1024 * 1024);
assert_parallel_correct_or_error("compressible", &data);
}
#[test]
fn test_parallel_all_zeros() {
let data = make_all_zeros(8 * 1024 * 1024);
assert_parallel_correct_or_error("all_zeros", &data);
}
#[test]
fn test_inflate_bit_full_deflate_no_window() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let result = crate::backends::inflate_bit::decompress_deflate_from_bit_with_end(
deflate,
0,
&[],
data.len() + 1024,
);
assert!(
result.is_some(),
"inflate_bit should succeed for full deflate (bit 0, no window, max={})",
data.len() + 1024
);
let (decoded, _) = result.unwrap();
assert_eq!(
decoded, data,
"inflate_bit full deflate should match original"
);
}
#[test]
fn test_inflate_bit_oracle_boundary_with_window() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 2 * 1024 * 1024);
if oracle.len() < 3 {
return; }
let slice0 = &deflate[..oracle[1].div_ceil(8).min(deflate.len())];
let (block0_bytes, end_bit0) =
crate::backends::inflate_bit::decompress_deflate_from_bit_with_end(
slice0,
0,
&[],
data.len(),
)
.expect("block 0 inflate failed");
let diff = end_bit0.abs_diff(oracle[1]);
assert!(
diff <= 7,
"end_bit0={} vs oracle[1]={} differ by {} bits",
end_bit0,
oracle[1],
diff
);
let window = &block0_bytes[block0_bytes.len().saturating_sub(WINDOW_SIZE)..];
let (rest_bytes, _) = crate::backends::inflate_bit::decompress_deflate_from_bit_with_end(
deflate,
oracle[1],
window,
data.len(),
)
.expect("oracle[1] inflate with window failed");
assert_eq!(
block0_bytes.len() + rest_bytes.len(),
data.len(),
"size mismatch"
);
let combined = [block0_bytes.as_slice(), rest_bytes.as_slice()].concat();
assert_eq!(combined, data, "combined output mismatch");
}
#[test]
fn test_parallel_random_data() {
let data = make_random_data(8 * 1024 * 1024, 42);
assert_parallel_correct_or_error("random", &data);
}
#[test]
fn test_parallel_text_data() {
let data = make_text_data(8 * 1024 * 1024);
assert_parallel_correct_or_error("text", &data);
}
#[test]
fn test_parallel_rle_data() {
let data = make_rle_data(8 * 1024 * 1024);
assert_parallel_correct_or_error("rle", &data);
}
#[test]
fn test_parallel_mixed_data() {
let data = make_mixed_data(8 * 1024 * 1024);
assert_parallel_correct_or_error("mixed", &data);
}
fn assert_parallel_correct_at_level(level: u32) {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_at_level(&data, level);
if compressed.len() < MIN_PARALLEL_SIZE {
return;
}
let mut output = Vec::new();
if let Ok(bytes) = decompress_parallel(&compressed, &mut output, 4) {
assert_eq!(bytes as usize, data.len(), "level {}: size mismatch", level);
assert_eq!(output, data, "level {}: content mismatch", level);
}
}
#[test]
fn test_parallel_level_1() {
assert_parallel_correct_at_level(1);
}
#[test]
fn test_parallel_level_6() {
assert_parallel_correct_at_level(6);
}
#[test]
fn test_parallel_level_9() {
assert_parallel_correct_at_level(9);
}
#[test]
fn test_parallel_exactly_min_size() {
let data = make_compressible_data(MIN_PARALLEL_SIZE * 3);
let compressed = make_gzip_data(&data);
let mut output = Vec::new();
let _ = decompress_parallel(&compressed, &mut output, 4);
if !output.is_empty() {
assert_eq!(&output, &data, "wrong output at min size boundary");
}
}
#[test]
fn test_parallel_below_min_size() {
let data = make_compressible_data(1024);
let compressed = make_gzip_data(&data);
let mut output = Vec::new();
let result = decompress_parallel(&compressed, &mut output, 4);
assert!(matches!(result, Err(ParallelError::TooSmall)));
}
#[test]
fn test_parallel_too_few_threads() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let mut output = Vec::new();
let result = decompress_parallel(&compressed, &mut output, 1);
assert!(matches!(result, Err(ParallelError::TooSmall)));
}
#[test]
fn test_parallel_invalid_header() {
let data = vec![0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09];
let mut output = Vec::new();
let result = decompress_parallel(&data, &mut output, 4);
assert!(result.is_err());
assert!(output.is_empty(), "invalid header should produce no output");
}
#[test]
fn test_parallel_truncated_gzip() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let truncated = &compressed[..compressed.len() / 2];
let mut output = Vec::new();
let result = decompress_parallel(truncated, &mut output, 4);
if let Ok(bytes) = result {
let prefix = &data[..bytes as usize];
assert_eq!(&output[..bytes as usize], prefix);
}
}
#[test]
fn test_parallel_empty_input() {
let mut output = Vec::new();
let result = decompress_parallel(&[], &mut output, 4);
assert!(result.is_err());
}
#[test]
fn test_chunk0_always_zero_markers() {
for (label, data) in [
("compressible", make_compressible_data(8 * 1024 * 1024)),
("text", make_text_data(8 * 1024 * 1024)),
("rle", make_rle_data(8 * 1024 * 1024)),
] {
let compressed = make_gzip_data(&data);
let header_size =
crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 2 {
continue;
}
let end_bit = boundaries[1];
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
let mut decoder = MarkerDecoder::new(&deflate[..end_byte], 0);
let _ = decoder.decode_until_bit(usize::MAX, end_bit);
assert_eq!(
decoder.marker_count(),
0,
"{}: chunk 0 has {} markers (should be 0)",
label,
decoder.marker_count()
);
}
}
#[test]
fn test_mid_chunks_have_markers() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 1024 * 1024);
assert!(boundaries.len() >= 4);
for i in 1..boundaries.len() - 1 {
let start_bit = boundaries[i];
let end_bit = boundaries[i + 1];
let start_byte = start_bit / 8;
let relative_start = start_bit % 8;
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
let data_slice = &deflate[start_byte..end_byte];
let mut decoder = MarkerDecoder::new(data_slice, relative_start);
let relative_end = end_bit - start_byte * 8;
let _ = decoder.decode_until_bit(usize::MAX, relative_end);
assert!(
decoder.marker_count() > 0,
"oracle chunk {}: 0 markers — every mid-stream chunk should have some",
i
);
}
}
#[test]
fn test_marker_rate_reasonable_at_oracle_boundaries() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 1024 * 1024);
for i in 1..boundaries.len() - 1 {
let start_bit = boundaries[i];
let end_bit = boundaries[i + 1];
let start_byte = start_bit / 8;
let relative_start = start_bit % 8;
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
let mut decoder = MarkerDecoder::new(&deflate[start_byte..end_byte], relative_start);
let relative_end = end_bit - start_byte * 8;
let _ = decoder.decode_until_bit(usize::MAX, relative_end);
let output_len = decoder.output().len();
let marker_count = decoder.marker_count();
if output_len > 0 {
let rate = marker_count as f64 / output_len as f64;
assert!(
rate < 0.15,
"oracle chunk {}: marker rate {:.1}% — real chunks should be <15%",
i,
rate * 100.0
);
}
}
}
#[test]
fn test_each_oracle_chunk_matches_sequential_range() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let scan = crate::decompress::scan_inflate::scan_deflate_fast(deflate, 1024 * 1024, 0)
.expect("scan");
let boundaries = get_oracle_boundaries(deflate, 1024 * 1024);
let mut expected_offset = 0usize;
for i in 0..boundaries.len() {
let start_bit = boundaries[i];
let is_last = i + 1 >= boundaries.len();
let start_byte = start_bit / 8;
let relative_start = start_bit % 8;
let mut decoder;
if is_last {
decoder = MarkerDecoder::new(&deflate[start_byte..], relative_start);
let _ = decoder.decode_until(deflate.len() * 8);
} else {
let end_bit = boundaries[i + 1];
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
decoder = MarkerDecoder::new(&deflate[start_byte..end_byte], relative_start);
let relative_end = end_bit - start_byte * 8;
let _ = decoder.decode_until_bit(usize::MAX, relative_end);
}
let output = decoder.output();
let chunk_len = output.len();
if i == 0 {
let chunk_bytes: Vec<u8> = output.iter().map(|&v| v as u8).collect();
assert_eq!(
&chunk_bytes,
&data[..chunk_len],
"chunk 0 doesn't match sequential"
);
} else if i < boundaries.len() - 1 {
let cp = &scan.checkpoints[i - 1];
let mut resolved = output.to_vec();
replace_markers(&mut resolved, &cp.window);
let chunk_bytes: Vec<u8> = resolved.iter().map(|&v| v as u8).collect();
assert_eq!(
&chunk_bytes,
&data[expected_offset..expected_offset + chunk_len],
"chunk {} resolved output doesn't match sequential range [{}..{}]",
i,
expected_offset,
expected_offset + chunk_len
);
}
expected_offset += chunk_len;
}
assert_eq!(expected_offset, data.len(), "total output size mismatch");
}
#[test]
fn test_max_output_minimum() {
assert!(max_output_for_chunk_compat(0) >= 64 * 1024);
assert!(max_output_for_chunk_compat(100) >= 64 * 1024);
}
#[test]
fn test_max_output_scaling() {
assert_eq!(max_output_for_chunk_compat(1024 * 1024), 8 * 1024 * 1024);
assert_eq!(
max_output_for_chunk_compat(10 * 1024 * 1024),
80 * 1024 * 1024
);
}
#[test]
fn test_boundaries_always_start_at_zero() {
for size in [4 * 1024 * 1024, 8 * 1024 * 1024] {
let data = make_compressible_data(size);
let compressed = make_gzip_data(&data);
let header_size =
crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
assert_eq!(
boundaries[0], 0,
"first boundary must be 0 for size {}",
size
);
}
}
#[test]
fn test_boundaries_within_data_range() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
for num_chunks in [2, 4, 8, 16] {
let boundaries = find_chunk_boundaries(deflate, num_chunks);
let max_bit = deflate.len() * 8;
for (i, &b) in boundaries.iter().enumerate() {
assert!(
b < max_bit,
"boundary {} at bit {} exceeds {} bits (chunks={})",
i,
b,
max_bit,
num_chunks
);
}
}
}
#[test]
fn test_boundaries_strictly_increasing() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
for num_chunks in [2, 4, 8] {
let boundaries = find_chunk_boundaries(deflate, num_chunks);
for i in 1..boundaries.len() {
assert!(
boundaries[i] > boundaries[i - 1],
"not strictly increasing at {} (chunks={}): {} >= {}",
i,
num_chunks,
boundaries[i - 1],
boundaries[i]
);
}
}
}
#[test]
fn test_boundaries_count_reasonable() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
for num_chunks in [4, 8] {
let boundaries = find_chunk_boundaries(deflate, num_chunks);
assert!(
boundaries.len() >= 2,
"need at least 2 boundaries for {} chunks, got {}",
num_chunks,
boundaries.len()
);
assert!(
boundaries.len() <= num_chunks + 1,
"too many boundaries for {} chunks: got {}",
num_chunks,
boundaries.len()
);
}
}
#[test]
fn test_boundaries_roughly_evenly_spaced() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 3 {
return;
}
let total_bits = deflate.len() * 8;
let expected_spacing = total_bits / boundaries.len();
for i in 1..boundaries.len() {
let spacing = boundaries[i] - boundaries[i - 1];
assert!(
spacing < expected_spacing * 5,
"chunk {} spacing {} is >5x expected {} — boundary misplaced",
i,
spacing,
expected_spacing
);
}
}
#[test]
fn test_resolve_single_chunk_correct_output() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 4 * 1024 * 1024);
if oracle.len() < 2 {
return;
}
let chunks = decode_chunks_parallel(deflate, &oracle[..2], 1).expect("decode");
let expected_crc = gzip_crc32(&compressed);
let expected_size = data.len();
let mut output = Vec::new();
let result = resolve_and_write(deflate, &chunks, &mut output, 0, 0);
assert!(
result.is_ok(),
"single chunk resolve failed: {:?}",
result.err()
);
assert!(!output.is_empty(), "output should not be empty");
let _ = expected_crc;
let _ = expected_size;
}
#[test]
fn test_resolve_size_mismatch_detected() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 4 * 1024 * 1024);
if oracle.len() < 2 {
return;
}
let chunks = decode_chunks_parallel(deflate, &oracle[..2], 1).expect("decode");
let mut output = Vec::new();
let result = resolve_and_write(deflate, &chunks, &mut output, 999999999, 0);
assert!(matches!(result, Err(ParallelError::SizeMismatch)));
}
#[test]
fn test_decode_until_bit_respects_output_limit() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let max_output = 1000;
let far_bit = deflate.len() * 8;
let mut decoder = MarkerDecoder::new(deflate, 0);
let _ = decoder.decode_until_bit(max_output, far_bit);
assert!(
decoder.output().len() <= max_output * 4,
"output {} far exceeded limit {}",
decoder.output().len(),
max_output
);
}
#[test]
fn test_decode_until_bit_stops_at_exact_boundary() {
let data = make_compressible_data(4 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 512 * 1024);
if boundaries.len() < 3 {
return;
}
let target = boundaries[1];
let mut decoder = MarkerDecoder::new(deflate, 0);
decoder.decode_until_bit(usize::MAX, target).unwrap();
assert_eq!(
decoder.bit_position(),
target,
"should stop exactly at oracle boundary"
);
}
#[test]
fn test_block_finder_candidates_are_valid_positions() {
let data = make_compressible_data(4 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let finder = BlockFinder::new(deflate);
let candidates = finder.find_blocks(0, deflate.len() * 8);
for c in &candidates {
assert!(
c.bit_offset < deflate.len() * 8,
"candidate at bit {} exceeds data length {} bits",
c.bit_offset,
deflate.len() * 8
);
}
}
#[test]
fn test_block_finder_sorted_output() {
let data = make_compressible_data(4 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let finder = BlockFinder::new(deflate);
let candidates = finder.find_blocks(0, deflate.len() * 8);
for i in 1..candidates.len() {
assert!(
candidates[i].bit_offset >= candidates[i - 1].bit_offset,
"candidates not sorted at index {}",
i
);
}
}
#[test]
fn test_pipeline_wrong_output_never_committed() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 4 * 1024 * 1024);
if oracle.len() < 2 {
return;
}
let chunks = decode_chunks_parallel(deflate, &oracle[..2], 1).expect("decode");
let mut output = Vec::new();
let result = resolve_and_write(deflate, &chunks, &mut output, 999999999, 0);
assert!(result.is_err());
assert!(output.is_empty(), "on error, no output should be written");
}
#[test]
fn test_regression_34k_false_positive() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 2 {
return;
}
if let Ok(chunks) = decode_chunks_parallel(deflate, &boundaries, 4) {
let total_bits = deflate.len() * 8;
let expected_bits_per_chunk = total_bits / chunks.len();
for (i, chunk) in chunks.iter().enumerate() {
let span = chunk.end_bit - chunk.start_bit;
let min_bits = expected_bits_per_chunk / 10;
assert!(
span > min_bits,
"chunk {} spans only {} bits (expected ~{})",
i,
span,
expected_bits_per_chunk
);
}
}
}
#[test]
fn test_regression_all_markers_false_positive() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 2 {
return;
}
if let Ok(chunks) = decode_chunks_parallel(deflate, &boundaries, 4) {
for (i, chunk) in chunks.iter().enumerate() {
assert!(
chunk.end_bit > chunk.start_bit,
"chunk {} has zero or inverted bit span: start={} end={}",
i,
chunk.start_bit,
chunk.end_bit
);
}
}
}
#[test]
fn test_regression_full_tail_slice() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 3 {
return;
}
let start_byte = boundaries[1] / 8;
let next_byte = boundaries[2] / 8;
let expected_end = (next_byte + 64 * 1024).min(deflate.len());
let slice_len = expected_end - start_byte;
let remaining = deflate.len() - start_byte;
assert!(
slice_len < remaining,
"chunk 1 slice ({} bytes) equals full tail ({} bytes)",
slice_len,
remaining
);
}
fn assert_parallel_matches_sequential(label: &str, gzip_data: &[u8]) {
let mut seq_output = Vec::new();
let mut decoder = flate2::read::GzDecoder::new(gzip_data);
std::io::Read::read_to_end(&mut decoder, &mut seq_output).unwrap();
let mut par_output = Vec::new();
match decompress_parallel(gzip_data, &mut par_output, 4) {
Ok(bytes) => {
assert_eq!(
bytes as usize,
seq_output.len(),
"{}: parallel size {} != sequential size {}",
label,
bytes,
seq_output.len()
);
assert_eq!(
par_output, seq_output,
"{}: parallel content != sequential content",
label
);
}
Err(_) => {
}
}
}
#[test]
fn test_parallel_eq_sequential_compressible() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
assert_parallel_matches_sequential("compressible", &compressed);
}
#[test]
fn test_parallel_eq_sequential_text() {
let data = make_text_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
assert_parallel_matches_sequential("text", &compressed);
}
#[test]
fn test_parallel_eq_sequential_rle() {
let data = make_rle_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
assert_parallel_matches_sequential("rle", &compressed);
}
#[test]
fn test_oracle_pipeline_byte_exact() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 1024 * 1024);
assert!(boundaries.len() >= 3);
let chunks = decode_chunks_parallel(deflate, &boundaries, 4).expect("decode");
let expected_crc = gzip_crc32(&compressed);
let mut output = Vec::new();
let result = resolve_and_write(deflate, &chunks, &mut output, data.len(), expected_crc);
assert!(result.is_ok(), "oracle pipeline failed: {:?}", result.err());
assert_eq!(output, data, "oracle pipeline content mismatch");
}
#[test]
fn test_find_boundaries_deterministic() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let b1 = find_chunk_boundaries(deflate, 4);
let b2 = find_chunk_boundaries(deflate, 4);
assert_eq!(b1, b2, "boundaries should be deterministic");
}
#[test]
fn test_parallel_output_deterministic() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let mut out1 = Vec::new();
let r1 = decompress_parallel(&compressed, &mut out1, 4);
let mut out2 = Vec::new();
let r2 = decompress_parallel(&compressed, &mut out2, 4);
match (r1, r2) {
(Ok(_), Ok(_)) => {
assert_eq!(out1, out2, "parallel output should be deterministic");
}
(Err(_), Err(_)) => { }
_ => panic!("inconsistent success/failure between runs"),
}
}
#[test]
fn test_parallel_various_sizes() {
for &size_mb in &[5, 8, 12, 16] {
let data = make_compressible_data(size_mb * 1024 * 1024);
assert_parallel_correct_or_error(&format!("{}MB", size_mb), &data);
}
}
#[test]
fn test_brute_force_boundaries_still_valid() {
let data = make_random_data(8 * 1024 * 1024, 999);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
assert_eq!(boundaries[0], 0);
for i in 1..boundaries.len() {
assert!(boundaries[i] > boundaries[i - 1]);
assert!(boundaries[i] < deflate.len() * 8);
}
}
#[test]
fn test_chunk_output_sizes_reasonable() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 2 {
return;
}
if let Ok(chunks) = decode_chunks_parallel(deflate, &boundaries, 4) {
let total_bits = deflate.len() * 8;
let expected_bits_per_chunk = total_bits / chunks.len();
for (i, chunk) in chunks.iter().enumerate() {
let span = chunk.end_bit - chunk.start_bit;
assert!(
span < expected_bits_per_chunk * 4,
"chunk {} spans {} bits, expected ~{} — likely overshoot or false positive",
i,
span,
expected_bits_per_chunk
);
}
}
}
#[test]
fn test_found_boundaries_near_oracle() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 2 * 1024 * 1024);
let found = find_chunk_boundaries(deflate, 4);
for &fb in &found[1..] {
let nearest = oracle
.iter()
.map(|&ob| fb.abs_diff(ob))
.min()
.unwrap_or(usize::MAX);
let one_mb_bits = 1024 * 1024 * 8;
assert!(
nearest < one_mb_bits,
"found boundary at bit {} is {} bits from nearest oracle (>1MB)",
fb,
nearest
);
}
}
#[test]
fn test_marker_encoding_distance_1() {
use crate::decompress::parallel::marker_decode::MARKER_BASE;
let data = make_compressible_data(64 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 16 * 1024);
if boundaries.len() < 3 {
return;
}
let start_bit = boundaries[1];
let start_byte = start_bit / 8;
let relative_start = start_bit % 8;
let end_byte = (boundaries[2] / 8 + 64 * 1024).min(deflate.len());
let mut decoder = MarkerDecoder::new(&deflate[start_byte..end_byte], relative_start);
let _ = decoder.decode_until(32 * 1024);
if decoder.marker_count() > 0 {
for &val in decoder.output() {
if val > 255 {
assert!(
val >= MARKER_BASE,
"marker value {} < MARKER_BASE {}",
val,
MARKER_BASE
);
}
}
}
}
#[test]
fn test_marker_encoding_distance_max() {
use crate::decompress::parallel::marker_decode::MARKER_BASE;
let max_marker = MARKER_BASE + (WINDOW_SIZE as u16 - 1);
assert_eq!(max_marker, u16::MAX); }
#[test]
fn test_marker_encoding_roundtrip() {
use crate::decompress::parallel::marker_decode::MARKER_BASE;
let mut window = vec![0u8; WINDOW_SIZE];
for (i, byte) in window.iter_mut().enumerate() {
*byte = (i % 256) as u8;
}
for offset in [0usize, 1, 100, WINDOW_SIZE - 1] {
let marker_val = MARKER_BASE + offset as u16;
let mut data = vec![marker_val];
replace_markers(&mut data, &window);
let expected = window[WINDOW_SIZE - 1 - offset];
assert_eq!(
data[0], expected as u16,
"roundtrip failed for offset {}: got {} expected {}",
offset, data[0], expected
);
}
}
#[test]
fn test_decode_block_stored() {
let mut block = vec![0x01, 5, 0, 0xFA, 0xFF];
block.extend_from_slice(&[0x41, 0x42, 0x43, 0x44, 0x45]);
let mut decoder = MarkerDecoder::new(&block, 0);
let is_final = decoder.decode_block().unwrap();
assert!(is_final, "should be final block");
assert_eq!(decoder.output().len(), 5);
let bytes: Vec<u8> = decoder.output().iter().map(|&v| v as u8).collect();
assert_eq!(bytes, vec![0x41, 0x42, 0x43, 0x44, 0x45]);
}
#[test]
fn test_decode_block_dynamic_huffman() {
let data = b"hello world hello world hello world";
let compressed = make_gzip_data(data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let mut decoder = MarkerDecoder::new(deflate, 0);
let _ = decoder.decode_until(1024);
let output: Vec<u8> = decoder.output().iter().map(|&v| v as u8).collect();
assert_eq!(&output, data, "dynamic huffman decode mismatch");
}
#[test]
fn test_eof_zero_fill_behavior() {
let data = b"x";
let compressed = make_gzip_data(data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let mut decoder = MarkerDecoder::new(deflate, 0);
let result = decoder.decode_until(1024);
assert!(result.is_ok(), "should not error on short stream");
let output: Vec<u8> = decoder.output().iter().map(|&v| v as u8).collect();
assert_eq!(&output, data);
}
#[test]
fn test_bit_position_tracks_across_blocks() {
let data = make_compressible_data(256 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let mut decoder = MarkerDecoder::new(deflate, 0);
let mut prev_pos = 0;
for _ in 0..10 {
match decoder.decode_block() {
Ok(_) => {
let pos = decoder.bit_position();
assert!(
pos > prev_pos,
"bit_position didn't advance: {} -> {}",
prev_pos,
pos
);
prev_pos = pos;
}
Err(_) => break,
}
}
assert!(prev_pos > 0, "should have advanced past bit 0");
}
#[test]
fn test_bit_position_starts_at_offset() {
let data = make_compressible_data(64 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let decoder = MarkerDecoder::new(deflate, 5);
assert_eq!(
decoder.bit_position(),
5,
"should start at requested bit offset"
);
}
fn assert_oracle_chain_convergence(label: &str, data: &[u8]) {
let compressed = make_gzip_data(data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 512 * 1024);
if boundaries.len() < 3 {
return;
}
for i in 0..boundaries.len() - 1 {
let start_bit = boundaries[i];
let end_bit = boundaries[i + 1];
let start_byte = start_bit / 8;
let relative_start = start_bit % 8;
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
let mut decoder = MarkerDecoder::new(&deflate[start_byte..end_byte], relative_start);
let relative_end = end_bit - start_byte * 8;
decoder
.decode_until_bit(usize::MAX, relative_end)
.expect("decode");
assert_eq!(
decoder.bit_position(),
relative_end,
"{}: chunk {} overshoot: landed at {} expected {} (delta={})",
label,
i,
decoder.bit_position(),
relative_end,
decoder.bit_position() as i64 - relative_end as i64
);
}
}
fn assert_oracle_chain_convergence_at_level(label: &str, data: &[u8], level: u32) {
let compressed = make_gzip_at_level(data, level);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 512 * 1024);
if boundaries.len() < 3 {
return;
}
for i in 0..boundaries.len() - 1 {
let start_bit = boundaries[i];
let end_bit = boundaries[i + 1];
let start_byte = start_bit / 8;
let relative_start = start_bit % 8;
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
let mut decoder = MarkerDecoder::new(&deflate[start_byte..end_byte], relative_start);
let relative_end = end_bit - start_byte * 8;
decoder
.decode_until_bit(usize::MAX, relative_end)
.expect("decode");
assert_eq!(
decoder.bit_position(),
relative_end,
"{} L{}: chunk {} overshoot",
label,
level,
i
);
}
}
#[test]
fn test_chain_convergence_text_data() {
let data = make_text_data(4 * 1024 * 1024);
assert_oracle_chain_convergence("text", &data);
}
#[test]
fn test_chain_convergence_rle_data() {
let data = make_rle_data(4 * 1024 * 1024);
assert_oracle_chain_convergence("rle", &data);
}
#[test]
fn test_chain_convergence_mixed_data() {
let data = make_mixed_data(4 * 1024 * 1024);
assert_oracle_chain_convergence("mixed", &data);
}
#[test]
fn test_chain_convergence_level1() {
let data = make_compressible_data(4 * 1024 * 1024);
assert_oracle_chain_convergence_at_level("compressible", &data, 1);
}
#[test]
fn test_chain_convergence_level9() {
let data = make_compressible_data(4 * 1024 * 1024);
assert_oracle_chain_convergence_at_level("compressible", &data, 9);
}
#[test]
fn test_false_positive_detected_by_marker_rate() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let mut rng: u64 = 0xfeedface;
for _ in 0..200 {
rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
let bit = (rng as usize) % (deflate.len() * 8);
if try_decode_at(deflate, bit) {
let start_byte = bit / 8;
let slice_len = (deflate.len() - start_byte).min(2 * 1024 * 1024);
let mut decoder =
MarkerDecoder::new(&deflate[start_byte..start_byte + slice_len], bit % 8);
let _ = decoder.decode_until(1024 * 1024);
if !decoder.output().is_empty() {
let rate = decoder.marker_count() as f64 / decoder.output().len() as f64;
assert!(
rate < 0.20,
"accepted position at bit {} has {:.1}% markers",
bit,
rate * 100.0
);
}
}
}
}
#[test]
fn test_false_positive_detected_by_output_volume() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let mid_bit = deflate.len() * 4; let result = try_decode_at(deflate, mid_bit);
let _ = result;
}
#[test]
fn test_false_positive_detected_by_zero_markers() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
assert!(
try_decode_at(deflate, 0),
"bit 0 should be accepted (real stream start)"
);
}
#[test]
fn test_false_positive_detected_by_chain_mismatch() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 3 {
return;
}
for i in 0..boundaries.len() - 1 {
let start_bit = boundaries[i];
let end_bit = boundaries[i + 1];
let start_byte = start_bit / 8;
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
let mut decoder = MarkerDecoder::new(&deflate[start_byte..end_byte], start_bit % 8);
let relative_end = end_bit - start_byte * 8;
let _ = decoder.decode_until_bit(usize::MAX, relative_end);
let final_pos = decoder.bit_position();
if final_pos != relative_end {
eprintln!(
"chain mismatch at chunk {}: bit {} -> {} (expected {})",
i,
start_bit,
final_pos + start_byte * 8,
end_bit
);
}
}
}
#[test]
fn test_bit_position_overshoot_means_false_positive() {
let data = make_compressible_data(4 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 512 * 1024);
assert!(oracle.len() >= 3);
for i in 0..oracle.len() - 1 {
let start_bit = oracle[i];
let end_bit = oracle[i + 1];
let start_byte = start_bit / 8;
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
let mut decoder = MarkerDecoder::new(&deflate[start_byte..end_byte], start_bit % 8);
let relative_end = end_bit - start_byte * 8;
let _ = decoder.decode_until_bit(usize::MAX, relative_end);
let final_pos = decoder.bit_position();
assert!(
final_pos <= relative_end,
"oracle boundary {}: overshoot {} > {} (delta={})",
i,
final_pos,
relative_end,
final_pos - relative_end
);
}
}
#[test]
fn test_false_positive_rate_under_1_percent() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let total_bits = deflate.len() * 8;
let mut rng: u64 = 0xdeadbeef_cafebabe;
let mut accepted = 0;
let n = 1000;
for _ in 0..n {
rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
let bit = (rng as usize) % total_bits;
if try_decode_at(deflate, bit) {
accepted += 1;
}
}
let rate = accepted as f64 / n as f64;
assert!(
rate < 0.01,
"false positive rate {:.2}% ({}/{}) exceeds 1%",
rate * 100.0,
accepted,
n
);
}
#[test]
fn test_resolve_window_propagates_across_chunks() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 2 * 1024 * 1024);
if boundaries.len() < 3 {
return;
}
let chunks = decode_chunks_parallel(deflate, &boundaries, 4).expect("decode");
let expected_crc = gzip_crc32(&compressed);
let mut output = Vec::new();
let result = resolve_and_write(deflate, &chunks, &mut output, data.len(), expected_crc);
assert!(
result.is_ok(),
"multi-chunk resolve failed: {:?}",
result.err()
);
assert_eq!(output, data, "multi-chunk output mismatch");
}
#[test]
fn test_output_crc32_matches_for_each_data_pattern() {
for (label, data) in [
("compressible", make_compressible_data(8 * 1024 * 1024)),
("text", make_text_data(8 * 1024 * 1024)),
("rle", make_rle_data(8 * 1024 * 1024)),
] {
let compressed = make_gzip_data(&data);
let mut output = Vec::new();
if decompress_parallel(&compressed, &mut output, 4).is_ok() {
let expected_crc = gzip_crc32(&compressed);
let actual_crc = compute_crc32(&output);
assert_eq!(
actual_crc, expected_crc,
"{}: CRC mismatch {:#010x} vs {:#010x}",
label, actual_crc, expected_crc
);
}
}
}
#[test]
fn test_isize_matches_output_length() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let isize_val = u32::from_le_bytes([
compressed[compressed.len() - 4],
compressed[compressed.len() - 3],
compressed[compressed.len() - 2],
compressed[compressed.len() - 1],
]) as usize;
assert_eq!(isize_val, data.len(), "ISIZE should equal data length");
}
#[test]
fn test_parallel_output_byte_identical_to_flate2() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let mut flate2_output = Vec::new();
let mut decoder = flate2::read::GzDecoder::new(&compressed[..]);
std::io::Read::read_to_end(&mut decoder, &mut flate2_output).unwrap();
let mut par_output = Vec::new();
if let Ok(bytes) = decompress_parallel(&compressed, &mut par_output, 4) {
assert_eq!(bytes as usize, flate2_output.len());
assert_eq!(par_output, flate2_output, "parallel != flate2 reference");
}
}
#[test]
fn test_parallel_output_byte_identical_to_consume_first() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let mut ref_output = vec![0u8; data.len() + 65536];
let ref_size = crate::decompress::inflate::consume_first_decode::inflate_consume_first(
deflate,
&mut ref_output,
)
.expect("reference inflate");
ref_output.truncate(ref_size);
let mut par_output = Vec::new();
if let Ok(bytes) = decompress_parallel(&compressed, &mut par_output, 4) {
assert_eq!(bytes as usize, ref_size);
assert_eq!(
par_output, ref_output,
"parallel != consume_first reference"
);
}
}
#[test]
fn test_output_never_written_before_size_check() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 4 * 1024 * 1024);
if oracle.len() < 2 {
return;
}
let chunks = decode_chunks_parallel(deflate, &oracle[..2], 1).expect("decode");
let mut output = Vec::new();
let result = resolve_and_write(deflate, &chunks, &mut output, 999999999, 0);
assert!(result.is_err(), "should detect size mismatch");
assert!(
output.is_empty(),
"on error, no output should be written (buffered write)"
);
}
#[test]
fn test_chunk_results_in_order() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
if boundaries.len() < 2 {
return;
}
if let Ok(chunks) = decode_chunks_parallel(deflate, &boundaries, 4) {
assert_eq!(
chunks[0].start_bit, 0,
"chunk 0 must start at bit 0 — results may be out of order"
);
for i in 1..chunks.len() {
assert!(
chunks[i].start_bit > chunks[i - 1].start_bit,
"chunks not in order: chunk[{}].start={} <= chunk[{}].start={}",
i,
chunks[i].start_bit,
i - 1,
chunks[i - 1].start_bit
);
}
}
}
#[test]
fn test_thread_count_4_8_16_same_output() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let mut outputs: Vec<(usize, Vec<u8>)> = Vec::new();
for threads in [4, 8, 16] {
let mut output = Vec::new();
if decompress_parallel(&compressed, &mut output, threads).is_ok() {
outputs.push((threads, output));
}
}
for window in outputs.windows(2) {
assert_eq!(
window[0].1.len(),
window[1].1.len(),
"T{} and T{} produced different sizes",
window[0].0,
window[1].0
);
assert_eq!(
window[0].1, window[1].1,
"T{} and T{} produced different output",
window[0].0, window[1].0
);
}
}
#[test]
fn test_single_boundary_returns_error() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = vec![0usize]; let result = decode_chunks_parallel(deflate, &boundaries, 4);
assert!(result.is_ok(), "single boundary should still decode");
if let Ok(chunks) = result {
assert_eq!(chunks.len(), 1);
assert_eq!(chunks[0].start_bit, 0, "single chunk must start at bit 0");
assert!(chunks[0].end_bit > 0, "single chunk must span some bits");
}
}
#[test]
fn test_error_propagation_across_threads() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let mut deflate = compressed[header_size..compressed.len() - 8].to_vec();
let boundaries = find_chunk_boundaries(&deflate, 4);
if boundaries.len() < 3 {
return;
}
let corrupt_byte = boundaries[1] / 8 + 100;
if corrupt_byte < deflate.len() {
deflate[corrupt_byte] = 0xFF;
deflate[corrupt_byte + 1] = 0xFF;
let _ = decode_chunks_parallel(&deflate, &boundaries, 4);
}
}
#[test]
fn test_gzip_header_with_extra_fields() {
use std::io::Write;
let data = make_compressible_data(8 * 1024 * 1024);
let mut encoder = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
encoder.write_all(&data).unwrap();
let compressed = encoder.finish().unwrap();
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed);
assert!(header_size.is_ok(), "should parse standard gzip header");
}
#[test]
fn test_gzip_header_minimal() {
let data = b"test";
let compressed = make_gzip_data(data);
assert!(compressed.len() >= 18, "minimum gzip is 18 bytes");
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed);
assert!(header_size.is_ok());
assert_eq!(header_size.unwrap(), 10, "minimal header is 10 bytes");
}
#[test]
fn test_exactly_one_deflate_block() {
let data = b"hello";
let compressed = make_gzip_data(data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
assert_eq!(boundaries, vec![0], "tiny data should have one boundary");
}
#[test]
fn test_many_tiny_deflate_blocks() {
let data = make_compressible_data(4 * 1024 * 1024);
let _compressed = make_gzip_at_level(&data, 1);
assert_parallel_correct_or_error("many_blocks_L1", &data);
}
#[test]
fn test_boundary_at_stored_block() {
let data = make_random_data(4 * 1024 * 1024, 12345);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 4);
assert!(
!boundaries.is_empty(),
"should find at least the start boundary"
);
}
#[test]
fn test_max_distance_backreference_at_boundary() {
let mut data = Vec::with_capacity(4 * 1024 * 1024);
let pattern: Vec<u8> = (0..32768).map(|i| (i % 251) as u8).collect();
while data.len() < 4 * 1024 * 1024 {
data.extend_from_slice(&pattern);
}
data.truncate(4 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 512 * 1024);
if boundaries.len() < 3 {
return;
}
for i in 0..boundaries.len() - 1 {
let start_bit = boundaries[i];
let end_bit = boundaries[i + 1];
let start_byte = start_bit / 8;
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
let mut decoder = MarkerDecoder::new(&deflate[start_byte..end_byte], start_bit % 8);
let relative_end = end_bit - start_byte * 8;
let _ = decoder.decode_until_bit(usize::MAX, relative_end);
assert_eq!(
decoder.bit_position(),
relative_end,
"max-distance chunk {}: overshoot",
i
);
}
}
#[test]
fn test_chunk_compressed_size_zero() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = vec![0, 0]; let result = decode_chunks_parallel(deflate, &boundaries, 2);
let _ = result;
}
#[test]
fn test_data_exactly_4mb_compressed() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let mut output = Vec::new();
match decompress_parallel(&compressed, &mut output, 4) {
Ok(bytes) => {
assert_eq!(bytes as usize, data.len());
assert_eq!(output, data);
}
Err(ParallelError::TooSmall) => {
}
Err(e) => {
eprintln!("4MB boundary test: {}", e);
}
}
}
#[test]
fn test_property_parallel_never_silently_wrong() {
for seed in [1u64, 42, 100, 999, 0xdeadbeef] {
let data = make_random_data(512 * 1024, seed);
let compressed = make_gzip_data(&data);
let mut output = Vec::new();
match decompress_parallel(&compressed, &mut output, 4) {
Ok(bytes) => {
assert_eq!(
bytes as usize,
data.len(),
"seed {}: size mismatch {} vs {}",
seed,
bytes,
data.len()
);
assert_eq!(output, data, "seed {}: SILENT WRONG OUTPUT", seed);
}
Err(_) => {
}
}
}
}
#[test]
fn test_property_oracle_pipeline_always_correct() {
for (label, data) in [
("compressible", make_compressible_data(4 * 1024 * 1024)),
("text", make_text_data(4 * 1024 * 1024)),
] {
let compressed = make_gzip_data(&data);
let header_size =
crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 512 * 1024);
if boundaries.len() < 3 {
continue;
}
let chunks = decode_chunks_parallel(deflate, &boundaries, 4)
.unwrap_or_else(|_| panic!("{}: oracle decode should never fail", label));
let mut output = Vec::new();
let mut window = Vec::<u8>::new();
for chunk in chunks.iter() {
let chunk_end_byte = chunk.end_bit.div_ceil(8).min(deflate.len());
let chunk_input = &deflate[..chunk_end_byte];
let bytes = crate::backends::inflate_bit::decompress_deflate_from_bit_with_end(
chunk_input,
chunk.start_bit,
&window,
data.len() + 1024,
)
.unwrap_or_else(|| {
panic!(
"{}: inflate_bit failed at start_bit={}",
label, chunk.start_bit
)
})
.0;
update_window(&mut window, &bytes);
output.extend_from_slice(&bytes);
}
assert_eq!(
output.len(),
data.len(),
"{}: oracle output size {} != expected {}",
label,
output.len(),
data.len()
);
assert_eq!(output, data, "{}: oracle output content mismatch", label);
}
}
#[test]
fn test_property_marker_rate_decreases_over_chunk() {
let data = make_compressible_data(4 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = get_oracle_boundaries(deflate, 512 * 1024);
if boundaries.len() < 3 {
return;
}
let i = 1;
let start_bit = boundaries[i];
let end_bit = boundaries[i + 1];
let start_byte = start_bit / 8;
let end_byte = (end_bit / 8 + 64 * 1024).min(deflate.len());
let mut decoder = MarkerDecoder::new(&deflate[start_byte..end_byte], start_bit % 8);
let relative_end = end_bit - start_byte * 8;
let _ = decoder.decode_until_bit(usize::MAX, relative_end);
let output = decoder.output();
if output.len() > WINDOW_SIZE * 2 {
let early_markers: usize = output[..WINDOW_SIZE].iter().filter(|&&v| v > 255).count();
let late_markers: usize = output[WINDOW_SIZE..WINDOW_SIZE * 2]
.iter()
.filter(|&&v| v > 255)
.count();
let early_rate = early_markers as f64 / WINDOW_SIZE as f64;
let late_rate = late_markers as f64 / WINDOW_SIZE as f64;
assert!(
late_rate <= early_rate,
"marker rate increased: early {:.1}% -> late {:.1}%",
early_rate * 100.0,
late_rate * 100.0
);
}
}
#[test]
fn test_property_chunk_output_monotonic() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("header");
let deflate = &compressed[header_size..compressed.len() - 8];
let boundaries = find_chunk_boundaries(deflate, 8);
if boundaries.len() < 4 {
return;
}
if let Ok(chunks) = decode_chunks_parallel(deflate, &boundaries, 8) {
let mid_sizes: Vec<usize> = chunks[1..chunks.len() - 1]
.iter()
.map(|c| c.end_bit - c.start_bit)
.collect();
if mid_sizes.len() >= 2 {
let max_size = *mid_sizes.iter().max().unwrap();
let min_size = *mid_sizes.iter().min().unwrap();
if min_size > 0 {
let ratio = max_size as f64 / min_size as f64;
assert!(
ratio < 10.0,
"mid-chunk size ratio {:.1}x (max={}, min={}) — likely false positive",
ratio,
max_size,
min_size
);
}
}
}
}
#[test]
fn test_find_chunk_end_bit0_produces_valid_end() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 1024 * 1024);
assert!(oracle.len() >= 2, "need oracle boundaries");
let until_bit = oracle[1];
let slice = &deflate[..until_bit.div_ceil(8).min(deflate.len())];
let end_bit =
find_chunk_end(slice, 0, 16 * 1024 * 1024).expect("chunk 0 from bit 0 should decode");
assert!(
end_bit.div_ceil(8) == until_bit.div_ceil(8) || end_bit <= until_bit + 8,
"end_bit {} too far from oracle boundary {} (diff {} bits)",
end_bit,
until_bit,
end_bit.abs_diff(until_bit)
);
let (chunk_bytes, _) = crate::backends::inflate_bit::decompress_deflate_from_bit_with_end(
slice,
0,
&[],
16 * 1024 * 1024,
)
.expect("inflate_bit decode");
assert_eq!(
&chunk_bytes,
&data[..chunk_bytes.len()],
"chunk 0 output doesn't match sequential reference"
);
}
#[test]
fn test_find_chunk_end_is_absolute_bit_position() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 1024 * 1024);
assert!(oracle.len() >= 3);
let slice = &deflate[..oracle[1].div_ceil(8).min(deflate.len())];
let end_bit = find_chunk_end(slice, 0, 16 * 1024 * 1024).expect("chunk 0 decode");
assert!(
end_bit <= oracle[1] + 8,
"end_bit {} should be close to oracle boundary {} (absolute bit position)",
end_bit,
oracle[1]
);
}
#[test]
fn test_find_chunk_end_handoff_to_sequential() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 2 * 1024 * 1024);
if oracle.len() < 3 {
return;
}
let b0 = oracle[0]; let b1 = oracle[1]; let slice0 = &deflate[..b1.div_ceil(8).min(deflate.len())];
let (chunk0_bytes, chunk0_end) =
crate::backends::inflate_bit::decompress_deflate_from_bit_with_end(
slice0,
b0,
&[],
8 * 1024 * 1024,
)
.expect("chunk0 inflate");
assert!(!chunk0_bytes.is_empty(), "chunk 0 should produce output");
assert!(chunk0_end <= slice0.len() * 8, "chunk0_end in range");
let fce_end = find_chunk_end(slice0, b0, 8 * 1024 * 1024).expect("find_chunk_end");
assert_eq!(
fce_end, chunk0_end,
"find_chunk_end must match decompress end_bit"
);
}
#[test]
fn test_find_chunk_end_returns_none_past_end() {
let data = make_compressible_data(1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let past_end_bit = deflate.len() * 8 + 100;
let result = find_chunk_end(deflate, past_end_bit, usize::MAX);
assert!(
result.is_none(),
"should return None for start past data end"
);
}
#[test]
fn test_decode_sequential_from_with_window() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 1024 * 1024);
assert!(oracle.len() >= 3);
let scan = crate::decompress::scan_inflate::scan_deflate_fast(deflate, 1024 * 1024, 0)
.expect("scan");
let window = &scan.checkpoints[0].window;
let (bytes, _end_bit) =
decode_sequential_from(deflate, oracle[1], Some(oracle[2]), window, data.len())
.expect("sequential decode with window");
assert!(
!bytes.is_empty(),
"sequential decode with window produced no output"
);
}
#[test]
fn test_decode_sequential_from_empty_at_end() {
let data = make_compressible_data(1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let start_bit = deflate.len() * 8 + 100;
let result = decode_sequential_from(deflate, start_bit, None, &[], data.len() + 1024);
assert!(
result.is_err(),
"past-end start_bit should return DecodeFailed, got {:?}",
result.as_ref().ok().map(|(b, e)| (b.len(), e))
);
}
#[test]
fn test_search_boundary_forward_returns_forward_only() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let total_bits = deflate.len() * 8;
let spacing = total_bits / 4;
for i in 1..4 {
let partition_bit = i * spacing;
if let Some(found) = search_boundary_forward(deflate, partition_bit) {
assert!(
found >= partition_bit,
"search_boundary_forward returned {} < partition {} \
(must be forward-only)",
found,
partition_bit
);
}
}
}
#[test]
fn test_search_boundary_forward_result_passes_try_decode() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let total_bits = deflate.len() * 8;
let spacing = total_bits / 4;
for i in 1..4 {
let partition_bit = i * spacing;
if let Some(found) = search_boundary_forward(deflate, partition_bit) {
assert!(
try_decode_at(deflate, found),
"search_boundary_forward returned {} which fails try_decode_at",
found
);
}
}
}
#[test]
fn test_search_boundary_forward_finds_valid_boundary() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let total_bits = deflate.len() * 8;
let spacing = total_bits / 4;
let mut found_count = 0;
for i in 1..4 {
let partition = i * spacing;
if let Some(found) = search_boundary_forward(deflate, partition) {
found_count += 1;
assert!(
try_decode_at(deflate, found),
"found boundary at {} doesn't pass try_decode_at",
found
);
assert!(
found < partition + SEARCH_RADIUS * 8,
"found boundary {} too far from partition {}",
found,
partition
);
}
}
eprintln!(
"search_boundary_forward: found {}/3 boundaries",
found_count
);
}
#[test]
fn test_confirm_all_hits() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 2 * 1024 * 1024);
if oracle.len() < 3 {
return;
}
let total_bits = deflate.len() * 8;
let mut specs: Vec<Option<SpeculativeChunk>> = Vec::new();
for i in 0..oracle.len() {
if oracle[i] >= total_bits {
break;
}
let until = oracle.get(i + 1).copied();
let slice = until
.map(|u| &deflate[..u.div_ceil(8).min(deflate.len())])
.unwrap_or(deflate);
let chunk =
find_chunk_end(slice, oracle[i], 8 * 1024 * 1024).map(|end_bit| SpeculativeChunk {
start_bit: oracle[i],
end_bit,
});
specs.push(chunk);
}
let expected_crc = gzip_crc32(&compressed);
let mut output = Vec::new();
let result = confirm_resolve_write(deflate, &specs, data.len(), expected_crc, &mut output);
assert!(
result.is_ok(),
"all-hits should succeed: {:?}",
result.err()
);
assert_eq!(output, data, "all-hits output should match sequential");
}
#[test]
fn test_confirm_all_misses() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let specs: Vec<Option<SpeculativeChunk>> = (0..4).map(|_| None).collect();
let expected_crc = gzip_crc32(&compressed);
let mut output = Vec::new();
let result = confirm_resolve_write(deflate, &specs, data.len(), expected_crc, &mut output);
assert!(
result.is_ok(),
"all-misses should succeed via sequential fallback: {:?}",
result.err()
);
assert_eq!(output, data, "all-misses output should match sequential");
}
#[test]
fn test_confirm_mixed_hits_misses() {
let data = make_text_data(32 * 1024 * 1024); let compressed = make_gzip_data(&data);
let mut output = Vec::new();
match decompress_parallel(&compressed, &mut output, 4) {
Ok(bytes) => {
assert_eq!(bytes as usize, data.len(), "parallel size mismatch");
assert_eq!(output, data, "parallel content mismatch");
}
Err(_) => {
}
}
}
#[test]
fn test_find_next_spec_start_basic() {
let mut map = std::collections::HashMap::new();
map.insert(100usize, 0usize);
map.insert(200, 1);
map.insert(300, 2);
assert_eq!(find_next_spec_start(&map, 50, 1000), 100);
assert_eq!(find_next_spec_start(&map, 100, 1000), 200);
assert_eq!(find_next_spec_start(&map, 150, 1000), 200);
assert_eq!(find_next_spec_start(&map, 200, 1000), 300);
assert_eq!(find_next_spec_start(&map, 250, 1000), 300);
}
#[test]
fn test_find_next_spec_start_empty_map() {
let map = std::collections::HashMap::new();
assert_eq!(
find_next_spec_start(&map, 0, 1000),
1000,
"empty map should return total_bits"
);
}
#[test]
fn test_find_next_spec_start_past_all() {
let mut map = std::collections::HashMap::new();
map.insert(100usize, 0usize);
map.insert(200, 1);
assert_eq!(
find_next_spec_start(&map, 300, 1000),
1000,
"after_bit past all entries should return total_bits"
);
}
#[test]
fn test_speculation_hit_invariant() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let oracle = get_oracle_boundaries(deflate, 2 * 1024 * 1024);
if oracle.len() < 3 {
return;
}
let slice0 = &deflate[..oracle[1].div_ceil(8).min(deflate.len())];
let chunk0_end = find_chunk_end(slice0, 0, 16 * 1024 * 1024).expect("chunk 0 decode");
if let Some(found) = search_boundary_forward(deflate, oracle[1]) {
if found == chunk0_end {
eprintln!(
"speculation hit: chunk0 end_bit={} == search result={}",
chunk0_end, found
);
} else {
eprintln!(
"speculation miss: chunk0 end_bit={}, search found={}, oracle={}",
chunk0_end, found, oracle[1]
);
}
}
let diff = chunk0_end.abs_diff(oracle[1]);
assert!(
diff <= 7,
"chunk0 end_bit {} differs from oracle boundary {} by {} bits (max 7) — \
bit arithmetic is wrong in find_chunk_end",
chunk0_end,
oracle[1],
diff
);
}
#[test]
fn test_max_output_for_chunk_new_signature() {
let isize_total = 100 * 1024 * 1024; let num_chunks = 4;
let compressed = 10 * 1024 * 1024;
let result = max_output_for_chunk(isize_total, num_chunks, compressed);
let isize_based = (isize_total / num_chunks) * 2; let ratio_based = compressed * 8;
assert_eq!(
result,
isize_based.max(ratio_based),
"should be max of isize-based ({}) and ratio-based ({})",
isize_based,
ratio_based
);
}
#[test]
fn test_max_output_for_chunk_high_ratio_data() {
let isize_total = 100 * 1024 * 1024;
let num_chunks = 4;
let compressed = 50 * 1024;
let result = max_output_for_chunk(isize_total, num_chunks, compressed);
let isize_based = (isize_total / num_chunks) * 2;
assert!(
result >= isize_based,
"high-ratio: result {} < isize_based {} — would truncate RLE/zeros output",
result,
isize_based
);
}
#[test]
fn test_max_output_for_chunk_zero_isize() {
let result = max_output_for_chunk(0, 4, 1024 * 1024);
assert_eq!(
result,
8 * 1024 * 1024,
"zero ISIZE should use ratio-based limit"
);
}
#[test]
fn test_verify_output_size_mismatch() {
let buffer = vec![1, 2, 3];
let result = verify_output(&buffer, 100, 0);
assert!(
matches!(result, Err(ParallelError::SizeMismatch)),
"should detect size mismatch"
);
}
#[test]
fn test_verify_output_crc_mismatch() {
let buffer = vec![1, 2, 3];
let wrong_crc = 0xDEADBEEF;
let result = verify_output(&buffer, 3, wrong_crc);
assert!(
matches!(result, Err(ParallelError::CrcMismatch)),
"should detect CRC mismatch"
);
}
#[test]
fn test_verify_output_correct() {
let buffer = vec![1, 2, 3];
let mut hasher = crc32fast::Hasher::new();
hasher.update(&buffer);
let correct_crc = hasher.finalize();
let result = verify_output(&buffer, 3, correct_crc);
assert!(result.is_ok(), "should pass with correct size and CRC");
}
#[test]
fn test_verify_output_zero_crc_skips_check() {
let buffer = vec![1, 2, 3];
let result = verify_output(&buffer, 3, 0);
assert!(result.is_ok(), "CRC 0 should skip CRC check");
}
#[test]
fn test_verify_output_zero_size_skips_check() {
let buffer = vec![1, 2, 3];
let result = verify_output(&buffer, 0, 0);
assert!(result.is_ok(), "size 0 should skip size check");
}
#[test]
fn test_speculative_decode_parallel_chunk0_always_some() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let total_bits = deflate.len() * 8;
let spacing = total_bits / 4;
let specs = speculative_decode_parallel(deflate, 4, spacing, data.len());
assert!(
specs[0].is_some(),
"chunk 0 must always succeed (starts at bit 0)"
);
let chunk0 = specs[0].as_ref().unwrap();
assert_eq!(chunk0.start_bit, 0, "chunk 0 must start at bit 0");
assert!(chunk0.end_bit > 0, "chunk 0 must span some deflate bits");
}
#[test]
fn test_speculative_decode_parallel_produces_correct_count() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
for num_chunks in [2, 4, 8] {
let total_bits = deflate.len() * 8;
let spacing = total_bits / num_chunks;
let specs = speculative_decode_parallel(deflate, num_chunks, spacing, data.len());
assert_eq!(
specs.len(),
num_chunks,
"should produce exactly {} speculative results",
num_chunks
);
}
}
#[test]
fn test_speculative_all_misses_still_produces_vec() {
let mut rng: u64 = 0xbadcafe;
let mut random_deflate = Vec::with_capacity(5 * 1024 * 1024);
for _ in 0..5 * 1024 * 1024 {
rng = rng.wrapping_mul(6364136223846793005).wrapping_add(1);
random_deflate.push((rng >> 32) as u8);
}
let num_chunks = 4;
let total_bits = random_deflate.len() * 8;
let spacing = total_bits / num_chunks;
let specs = speculative_decode_parallel(&random_deflate, num_chunks, spacing, 0);
assert_eq!(specs.len(), num_chunks);
}
#[test]
fn test_confirm_resolve_write_all_none_falls_back_to_sequential() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let isize_offset = compressed.len() - 4;
let expected_size = u32::from_le_bytes([
compressed[isize_offset],
compressed[isize_offset + 1],
compressed[isize_offset + 2],
compressed[isize_offset + 3],
]) as usize;
let crc_offset = compressed.len() - 8;
let expected_crc = u32::from_le_bytes([
compressed[crc_offset],
compressed[crc_offset + 1],
compressed[crc_offset + 2],
compressed[crc_offset + 3],
]);
let specs: Vec<Option<SpeculativeChunk>> = vec![None, None, None, None];
let mut output = Vec::new();
let result =
confirm_resolve_write(deflate, &specs, expected_size, expected_crc, &mut output);
assert!(
result.is_ok(),
"all-miss pipeline should still produce correct output via sequential"
);
assert_eq!(output, data, "output must match original data");
}
#[test]
fn test_decode_sequential_from_corrupt_returns_error() {
let corrupt: Vec<u8> = (0..1000).map(|i| (i * 37 + 11) as u8).collect();
let window = vec![0u8; WINDOW_SIZE];
let result = decode_sequential_from(&corrupt, 0, None, &window, 1024 * 1024);
match result {
Ok((bytes, _)) => {
assert!(
bytes.len() < 100_000,
"corrupt data shouldn't produce large output"
);
}
Err(e) => {
assert!(matches!(e, ParallelError::DecodeFailed));
}
}
}
#[test]
fn test_decode_sequential_from_corrupt_all_ff_returns_error() {
let corrupt = vec![0xFF; 10_000];
let window = vec![0u8; WINDOW_SIZE];
let result = decode_sequential_from(&corrupt, 0, None, &window, 1024 * 1024);
assert!(result.is_err(), "all-0xFF deflate should fail");
assert!(matches!(result.unwrap_err(), ParallelError::DecodeFailed));
}
#[test]
fn test_decode_sequential_from_past_end() {
let data = vec![0u8; 100];
let result = decode_sequential_from(&data, data.len() * 8 + 100, None, &[], 1024);
assert!(
result.is_err(),
"past-end start_bit should return DecodeFailed"
);
assert!(matches!(result.unwrap_err(), ParallelError::DecodeFailed));
}
#[test]
fn test_confirm_consecutive_misses_window_propagation() {
let data = make_compressible_data(8 * 1024 * 1024);
let compressed = make_gzip_data(&data);
let header_size = crate::decompress::parallel::marker_decode::skip_gzip_header(&compressed)
.expect("valid header");
let deflate = &compressed[header_size..compressed.len() - 8];
let isize_offset = compressed.len() - 4;
let expected_size = u32::from_le_bytes([
compressed[isize_offset],
compressed[isize_offset + 1],
compressed[isize_offset + 2],
compressed[isize_offset + 3],
]) as usize;
let crc_offset = compressed.len() - 8;
let expected_crc = u32::from_le_bytes([
compressed[crc_offset],
compressed[crc_offset + 1],
compressed[crc_offset + 2],
compressed[crc_offset + 3],
]);
let total_bits = deflate.len() * 8;
let num_chunks = 3;
let spacing = total_bits / num_chunks;
let mut specs = speculative_decode_parallel(deflate, num_chunks, spacing, data.len());
specs[1] = None;
specs[2] = None;
let mut output = Vec::new();
let result =
confirm_resolve_write(deflate, &specs, expected_size, expected_crc, &mut output);
assert!(
result.is_ok(),
"consecutive misses should produce correct output"
);
assert_eq!(
output, data,
"window must propagate correctly across consecutive sequential decodes"
);
}
#[test]
fn test_confirm_alternating_hits_and_misses() {
let data = make_text_data(32 * 1024 * 1024); let compressed = make_gzip_data(&data);
let mut output = Vec::new();
match decompress_parallel(&compressed, &mut output, 4) {
Ok(bytes) => {
assert_eq!(bytes as usize, data.len(), "parallel size mismatch");
assert_eq!(output, data, "parallel content mismatch");
}
Err(_) => {
}
}
}
}