parallel_bzip2_decoder/lib.rs
1//! High-performance parallel bzip2 decompression library.
2//!
3//! This library provides efficient parallel decompression of bzip2 files by processing
4//! multiple blocks concurrently. It achieves significant speedups on multi-core systems
5//! compared to sequential decompression.
6//!
7//! # Features
8//!
9//! - **Parallel block decompression**: Utilizes all available CPU cores
10//! - **Streaming API**: Implements `std::io::Read` for easy integration
11//! - **Memory-efficient**: Uses bounded channels to limit memory usage
12//! - **Zero-copy where possible**: Memory-mapped I/O for file access
13//! - **Full bzip2 format support**: Handles both single-stream and multi-stream bzip2 files
14//! - **Error handling**: Comprehensive error reporting with `anyhow` integration
15//!
16//! # Architecture
17//!
18//! The library uses a multi-stage pipeline:
19//!
20//! 1. **Scanning**: Identifies block boundaries using parallel pattern matching
21//! 2. **Decompression**: Processes blocks in parallel using Rayon
22//! 3. **Reordering**: Ensures output maintains correct block order
23//!
24//! # Quick Start
25//!
26//! The easiest way to use this library is through the `Bz2Decoder`:
27//!
28//! ```no_run
29//! use parallel_bzip2_decoder::Bz2Decoder;
30//! use std::io::Read;
31//!
32//! let mut decoder = Bz2Decoder::open("file.bz2").unwrap();
33//! let mut data = Vec::new();
34//! decoder.read_to_end(&mut data).unwrap();
35//! ```
36//!
37//! # Advanced Usage
38//!
39//! For more control, you can use the lower-level functions:
40//!
41//! ```no_run
42//! use parallel_bzip2_decoder::{scan_blocks, decompress_block};
43//!
44//! let compressed_data = std::fs::read("file.bz2").unwrap();
45//! let block_receiver = scan_blocks(&compressed_data);
46//!
47//! for (start_bit, end_bit) in block_receiver {
48//! let decompressed = decompress_block(&compressed_data, start_bit, end_bit).unwrap();
49//! // Process decompressed block...
50//! }
51//! ```
52//!
53//! # Performance
54//!
55//! Performance scales nearly linearly with the number of CPU cores. On an 8-core system,
56//! expect 6-7x speedup compared to single-threaded bzip2 decompression.
57//!
58//! # Thread Safety
59//!
60//! All public types are thread-safe. The library uses Rayon's global thread pool by default,
61//! but creates dedicated pools where needed to avoid deadlocks.
62//!
63//! # Error Handling
64//!
65//! This crate uses `anyhow` for comprehensive error handling. Most functions return
66//! `Result<T, anyhow::Error>` for easy error propagation using the `?` operator.
67//!
68//! # Memory Usage
69//!
70//! The library is designed with memory efficiency in mind:
71//! - Memory-mapped I/O for large files
72//! - Bounded channels to prevent unbounded memory growth
73//! - Buffer reuse in block processing
74//!
75//! # Benchmarks
76//!
77//! Run benchmarks with `cargo bench` to measure performance on your system.
78//! Various benchmark suites test different aspects of performance:
79//! - Decode benchmarks with various file sizes
80//! - Scanner performance
81//! - End-to-end pipeline performance
82
83pub mod decoder;
84pub mod scanner;
85pub use decoder::Bz2Decoder;
86pub use scanner::{extract_bits, MarkerType, Scanner};
87
88use anyhow::{Context, Result};
89use bzip2::read::BzDecoder;
90use crossbeam_channel::bounded;
91use std::collections::HashMap;
92use std::io::Read;
93
94/// Scans bzip2 data for block boundaries and returns them via a channel.
95///
96/// This function spawns background threads to scan the data in parallel and identify
97/// block start and end positions. The results are sent through a channel as
98/// (start_bit, end_bit) tuples representing block boundaries.
99///
100/// # Architecture
101///
102/// The function creates a two-stage pipeline:
103/// 1. **Scanner thread**: Performs parallel chunk-based scanning
104/// 2. **Reordering thread**: Collects chunks and converts markers to block boundaries
105///
106/// # Arguments
107///
108/// * `data` - The bzip2 compressed data to scan
109///
110/// # Returns
111///
112/// A receiver that yields (start_bit, end_bit) tuples for each block found.
113/// The receiver will be closed when all blocks have been identified.
114///
115/// # Performance
116///
117/// - **Channel buffer**: Sized at 100 to balance memory usage and throughput
118/// - **Chunk buffer**: Limited to 4 chunks to prevent excessive memory usage
119/// - **Thread safety**: Creates its own thread pool to avoid deadlock
120///
121/// # Examples
122///
123/// ```no_run
124/// use parallel_bzip2_decoder::scan_blocks;
125///
126/// let data = std::fs::read("file.bz2").unwrap();
127/// let blocks = scan_blocks(&data);
128///
129/// for (start, end) in blocks {
130/// println!("Block from bit {} to bit {}", start, end);
131/// }
132/// ```
133pub fn scan_blocks(data: &[u8]) -> crossbeam_channel::Receiver<(u64, u64)> {
134 // Channel for sending block boundaries to the caller
135 // Buffer size of 100 allows good throughput without excessive memory use
136 let (task_sender, task_receiver) = bounded(100);
137
138 // Clone data into an Arc for safe sharing across threads
139 let data_vec = data.to_vec();
140 let data_arc = std::sync::Arc::new(data_vec);
141 let data_clone = data_arc.clone();
142
143 std::thread::spawn(move || {
144 let scanner = Scanner::new();
145 // Small buffer for chunks to prevent scanning too far ahead
146 // This maintains cache locality and limits memory usage
147 let (chunk_tx, chunk_rx) = bounded(4);
148
149 // Spawn the actual scanning in a background thread
150 let scan_data = data_clone.clone();
151 let _scan_handle = std::thread::spawn(move || {
152 scanner.scan_stream(&scan_data, 0, chunk_tx);
153 });
154
155 // Reorder chunks and convert markers to block boundaries
156 let mut chunk_buffer: HashMap<usize, Vec<(u64, MarkerType)>> = HashMap::new();
157 let mut next_chunk_idx = 0;
158 let mut current_block_start: Option<u64> = None;
159
160 for (idx, markers) in chunk_rx {
161 chunk_buffer.insert(idx, markers);
162
163 // Process chunks in order
164 while let Some(markers) = chunk_buffer.remove(&next_chunk_idx) {
165 for (marker_pos, mtype) in markers {
166 match mtype {
167 MarkerType::Block => {
168 // Block marker: end previous block (if any) and start new one
169 if let Some(start) = current_block_start {
170 if task_sender.send((start, marker_pos)).is_err() {
171 return; // Receiver dropped, stop scanning
172 }
173 }
174 current_block_start = Some(marker_pos);
175 }
176 MarkerType::Eos => {
177 // End-of-stream marker: end current block
178 if let Some(start) = current_block_start {
179 if task_sender.send((start, marker_pos)).is_err() {
180 return;
181 }
182 current_block_start = None;
183 }
184 }
185 }
186 }
187 next_chunk_idx += 1;
188 }
189 }
190
191 // Handle edge case: block without EOS marker (truncated file)
192 if let Some(start) = current_block_start {
193 let end = (data_clone.len() as u64) * 8;
194 let _ = task_sender.send((start, end));
195 }
196 });
197
198 task_receiver
199}
200
201/// Decompresses a single bzip2 block and returns the decompressed data.
202///
203/// This is a convenience wrapper around `decompress_block_into` that allocates
204/// the output buffer for you. For better performance when decompressing multiple
205/// blocks, use `decompress_block_into` with reused buffers.
206///
207/// # Arguments
208///
209/// * `data` - The complete bzip2 file data
210/// * `start_bit` - Bit offset where the block starts
211/// * `end_bit` - Bit offset where the block ends
212///
213/// # Returns
214///
215/// The decompressed block data
216///
217/// # Errors
218///
219/// Returns an error if the block is corrupted or cannot be decompressed.
220///
221/// # Examples
222///
223/// ```no_run
224/// use parallel_bzip2_decoder::{scan_blocks, decompress_block};
225///
226/// let data = std::fs::read("file.bz2").unwrap();
227/// let blocks = scan_blocks(&data);
228///
229/// if let Some((start, end)) = blocks.iter().next() {
230/// let decompressed = decompress_block(&data, start, end).unwrap();
231/// println!("Decompressed {} bytes", decompressed.len());
232/// }
233/// ```
234pub fn decompress_block(data: &[u8], start_bit: u64, end_bit: u64) -> Result<Vec<u8>> {
235 let mut out = Vec::new();
236 let mut scratch = Vec::new();
237 decompress_block_into(data, start_bit, end_bit, &mut out, &mut scratch)?;
238 Ok(out)
239}
240
241/// Decompresses a single bzip2 block into provided buffers (zero-allocation).
242///
243/// This function is optimized for decompressing multiple blocks by reusing buffers.
244/// It's used internally by the parallel decoder for maximum performance.
245///
246/// # Arguments
247///
248/// * `data` - The complete bzip2 file data
249/// * `start_bit` - Bit offset where the block starts
250/// * `end_bit` - Bit offset where the block ends
251/// * `out` - Output buffer for decompressed data (will be cleared)
252/// * `scratch` - Scratch buffer for compressed data with header (will be cleared)
253///
254/// # Performance
255///
256/// By reusing `scratch` across multiple calls, this function avoids allocating
257/// a new buffer for each block. This is especially important in parallel scenarios
258/// where thousands of blocks may be processed.
259///
260/// # Errors
261///
262/// Returns an error if the block is corrupted or cannot be decompressed.
263///
264/// # Examples
265///
266/// ```no_run
267/// use parallel_bzip2_decoder::{scan_blocks, decompress_block_into};
268///
269/// let data = std::fs::read("file.bz2").unwrap();
270/// let blocks = scan_blocks(&data);
271///
272/// let mut out = Vec::new();
273/// let mut scratch = Vec::new();
274///
275/// for (start, end) in blocks {
276/// decompress_block_into(&data, start, end, &mut out, &mut scratch).unwrap();
277/// // Process `out`...
278/// }
279/// ```
280pub fn decompress_block_into(
281 data: &[u8],
282 start_bit: u64,
283 end_bit: u64,
284 out: &mut Vec<u8>,
285 scratch: &mut Vec<u8>,
286) -> Result<()> {
287 scratch.clear();
288 // Add minimal bzip2 header (BZh9 = highest compression level)
289 scratch.extend_from_slice(b"BZh9");
290 // Extract the block bits and append to scratch buffer
291 extract_bits(data, start_bit, end_bit, scratch);
292
293 // Decompress using the bzip2 crate
294 // Note: The last block may not have a proper EOS marker, causing UnexpectedEof
295 out.clear();
296 let mut decoder = BzDecoder::new(&scratch[..]);
297 match decoder.read_to_end(out) {
298 Ok(_) => Ok(()),
299 // UnexpectedEof is expected for the last block without EOS marker
300 Err(e) if e.kind() == std::io::ErrorKind::UnexpectedEof => Ok(()),
301 Err(e) => Err(e).context("Failed to decompress block"),
302 }
303}
304
305/// Decompresses an entire bzip2 file and returns the decompressed data.
306///
307/// This is a convenience function that combines scanning and decompression.
308/// It's primarily used for testing but can be useful for simple use cases.
309///
310/// For more control or streaming decompression, use `Bz2Decoder` instead.
311///
312/// # Arguments
313///
314/// * `path` - Path to the bzip2 file
315///
316/// # Returns
317///
318/// The complete decompressed file contents
319///
320/// # Errors
321///
322/// Returns an error if:
323/// - The file cannot be opened
324/// - The file is not a valid bzip2 file
325/// - Decompression fails
326///
327/// # Examples
328///
329/// ```no_run
330/// use parallel_bzip2_decoder::parallel_bzip2_cat;
331///
332/// let data = parallel_bzip2_cat("file.bz2").unwrap();
333/// println!("Decompressed {} bytes", data.len());
334/// ```
335pub fn parallel_bzip2_cat<P: AsRef<std::path::Path>>(path: P) -> Result<Vec<u8>> {
336 let mut decoder = Bz2Decoder::open(path)?;
337 let mut data = Vec::new();
338 decoder.read_to_end(&mut data)?;
339 Ok(data)
340}