gzippy 0.8.0

The fastest parallel gzip. Drop-in replacement for gzip and pigz, and a Rust library.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
//! Pipelined compression with dictionary sharing for maximum compression
//!
//! At high compression levels (L7-L9), users expect maximum compression ratio.
//! This module implements pigz-style pipelined compression where each block
//! uses the previous block's data as a dictionary.
//!
//! Key insight from pigz: blocks can be compressed in PARALLEL because
//! block N only needs block N-1's INPUT data as dictionary, not its output.
//! Since all input is pre-read (mmap), we can pipeline efficiently.
//!
//! Trade-off:
//! - Better compression (matches pigz output size)
//! - Sequential decompression only (like pigz)
//!
//! This is used when compression_level >= 7 and threads > 1.

use crate::compress::parallel::{adjust_compression_level, GzipHeaderInfo};
use crate::infra::scheduler::compress_parallel;
use flate2::{Compress, Compression, FlushCompress, Status};
use std::cell::{RefCell, UnsafeCell};
use std::io::{self, Read, Write};
use std::mem::MaybeUninit;
use std::path::Path;

// Thread-local Compress object to avoid reinitializing zlib state (~300KB) per block
// Note: We store Option<(level, Compress)> to cache by level
thread_local! {
    static PIPELINED_COMPRESS: RefCell<Option<(u32, Compress)>> = const { RefCell::new(None) };
}

/// Default block size for pipelined compression - matches pigz (128KB)
const DEFAULT_BLOCK_SIZE: usize = 128 * 1024;

/// Dictionary size (DEFLATE maximum is 32KB)
const DICT_SIZE: usize = 32 * 1024;

struct CrcSlot(UnsafeCell<MaybeUninit<crc32fast::Hasher>>);
// Safety: each slot is written by exactly one worker before all threads join.
unsafe impl Sync for CrcSlot {}

/// Block size for pipelined compression.
///
/// For GHA's 4-vCPU environment with 100MB+ files, the coordination overhead
/// of 780+ blocks dominates. Use slightly larger blocks (192KB) for very large
/// files to reduce block count while staying within compression ratio limits.
#[inline]
fn pipelined_block_size(input_len: usize, _num_threads: usize, _level: u32) -> usize {
    if input_len >= 50 * 1024 * 1024 {
        // For files >= 50MB, use 192KB blocks (520 blocks for 100MB)
        // This is a compromise between pigz's 128KB and our tested 256KB
        192 * 1024
    } else {
        // Match pigz for smaller files
        DEFAULT_BLOCK_SIZE
    }
}

/// Pipelined gzip compression with dictionary sharing
///
/// This produces a single gzip member with dictionary sharing between
/// internal blocks, achieving compression ratios comparable to pigz.
///
/// The output is gzip-compatible but requires sequential decompression.
pub struct PipelinedGzEncoder {
    compression_level: u32,
    num_threads: usize,
    header_info: GzipHeaderInfo,
}

impl PipelinedGzEncoder {
    pub fn new(compression_level: u32, num_threads: usize) -> Self {
        Self {
            compression_level,
            num_threads,
            header_info: GzipHeaderInfo::default(),
        }
    }

    pub fn set_header_info(&mut self, info: GzipHeaderInfo) {
        self.header_info = info;
    }

    /// Build a flate2 GzBuilder with FNAME/MTIME/FCOMMENT from header_info
    fn gz_builder(&self) -> flate2::GzBuilder {
        let mut builder = flate2::GzBuilder::new();
        if let Some(ref name) = self.header_info.filename {
            builder = builder.filename(name.as_bytes());
        }
        builder = builder.mtime(self.header_info.mtime);
        if let Some(ref comment) = self.header_info.comment {
            builder = builder.comment(comment.as_bytes());
        }
        builder
    }

    /// Compress a pre-read buffer directly, avoiding the extra copy from Reader→Vec.
    pub fn compress_buffer<W: Write + Send>(&self, data: &[u8], writer: W) -> io::Result<u64> {
        if data.is_empty() {
            let encoder = self
                .gz_builder()
                .write(writer, Compression::new(self.compression_level));
            encoder.finish()?;
            return Ok(0);
        }

        if self.num_threads > 1 {
            self.compress_parallel_pipeline(data, writer)?;
        } else {
            self.compress_sequential(data, writer)?;
        }
        Ok(data.len() as u64)
    }

    /// Compress data with dictionary sharing
    pub fn compress<R: Read, W: Write + Send>(&self, mut reader: R, writer: W) -> io::Result<u64> {
        // Read all input data
        let mut input_data = Vec::new();
        let bytes_read = reader.read_to_end(&mut input_data)? as u64;

        if input_data.is_empty() {
            // Write empty gzip file
            let encoder = self
                .gz_builder()
                .write(writer, Compression::new(self.compression_level));
            encoder.finish()?;
            return Ok(0);
        }

        if self.num_threads > 1 {
            self.compress_parallel_pipeline(&input_data, writer)?;
        } else {
            self.compress_sequential(&input_data, writer)?;
        }
        Ok(bytes_read)
    }

    /// Compress file using memory-mapped I/O with dictionary sharing
    pub fn compress_file<P: AsRef<Path>, W: Write + Send>(
        &self,
        path: P,
        writer: W,
    ) -> io::Result<u64> {
        use memmap2::Mmap;
        use std::fs::File;

        let file = File::open(path.as_ref())?;
        let file_len = file.metadata()?.len() as usize;

        if file_len == 0 {
            let encoder = self
                .gz_builder()
                .write(writer, Compression::new(self.compression_level));
            encoder.finish()?;
            return Ok(0);
        }

        // Memory-map the file for zero-copy access
        let mmap = unsafe { Mmap::map(&file)? };

        // Hint to kernel that we'll access the data sequentially
        #[cfg(unix)]
        {
            let _ = mmap.advise(memmap2::Advice::Sequential);
        }

        if self.num_threads > 1 {
            self.compress_parallel_pipeline(&mmap, writer)?;
        } else {
            self.compress_sequential(&mmap, writer)?;
        }
        Ok(file_len as u64)
    }

    /// Parallel pipelined compression using custom scheduler
    ///
    /// Each block is compressed with dictionary = previous block's input data.
    /// Blocks are compressed in parallel since we have all input data upfront.
    /// Output is streamed in order as blocks complete.
    ///
    /// Uses our custom scheduler instead of rayon for:
    /// - Zero work-stealing overhead (uniform block sizes)
    /// - Streaming output (no bulk collection)
    /// - Pre-allocated buffers (no allocation in hot path)
    fn compress_parallel_pipeline<W: Write + Send>(
        &self,
        data: &[u8],
        mut writer: W,
    ) -> io::Result<()> {
        let level = adjust_compression_level(self.compression_level);
        let data_len = data.len();
        let block_size = pipelined_block_size(data_len, self.num_threads, level);
        let num_blocks = data_len.div_ceil(block_size);

        // Write gzip header before spawning threads
        let mut header = Vec::with_capacity(64);
        let mut flags: u8 = 0x00;
        if self.header_info.filename.is_some() {
            flags |= 0x08;
        }
        if self.header_info.comment.is_some() {
            flags |= 0x10;
        }
        header.extend_from_slice(&[0x1f, 0x8b, 0x08, flags]);
        header.extend_from_slice(&self.header_info.mtime.to_le_bytes());
        header.extend_from_slice(&[0x00, 0xff]);
        if let Some(ref name) = self.header_info.filename {
            header.extend_from_slice(name.as_bytes());
            header.push(0);
        }
        if let Some(ref comment) = self.header_info.comment {
            header.extend_from_slice(comment.as_bytes());
            header.push(0);
        }
        writer.write_all(&header)?;

        // Use a CRC combiner to compute final CRC
        // Each block's CRC is computed in parallel, then combined
        let crc_parts: Vec<CrcSlot> = (0..num_blocks)
            .map(|_| CrcSlot(UnsafeCell::new(MaybeUninit::uninit())))
            .collect();

        // Compress all blocks using custom scheduler with dedicated writer thread
        let mut writer = compress_parallel(
            data,
            block_size,
            self.num_threads,
            writer,
            |block_idx, block, dict, is_last, output| {
                // Compress this block with dictionary
                compress_block_with_dict(block, dict, level, block_size, is_last, output);

                // Compute CRC for this block
                let mut hasher = crc32fast::Hasher::new();
                hasher.update(block);
                unsafe {
                    *crc_parts[block_idx].0.get() = MaybeUninit::new(hasher);
                }
            },
        )?;

        // Combine CRCs in order
        let mut combined_hasher = crc32fast::Hasher::new();
        for part in &crc_parts {
            let hasher = unsafe { (*part.0.get()).assume_init_read() };
            combined_hasher.combine(&hasher);
        }
        let combined_crc = combined_hasher.finalize();

        // Write gzip trailer
        let isize = (data_len as u32).to_le_bytes();
        writer.write_all(&combined_crc.to_le_bytes())?;
        writer.write_all(&isize)?;

        Ok(())
    }

    /// Sequential compression (single-threaded, for -p1)
    fn compress_sequential<W: Write>(&self, data: &[u8], mut writer: W) -> io::Result<()> {
        use crc32fast::Hasher;

        let level = adjust_compression_level(self.compression_level);

        // Write gzip header with FNAME/MTIME
        let mut header = Vec::with_capacity(64);
        let mut flags: u8 = 0x00;
        if self.header_info.filename.is_some() {
            flags |= 0x08;
        }
        if self.header_info.comment.is_some() {
            flags |= 0x10;
        }
        header.extend_from_slice(&[0x1f, 0x8b, 0x08, flags]);
        header.extend_from_slice(&self.header_info.mtime.to_le_bytes());
        header.extend_from_slice(&[0x00, 0xff]);
        if let Some(ref name) = self.header_info.filename {
            header.extend_from_slice(name.as_bytes());
            header.push(0);
        }
        if let Some(ref comment) = self.header_info.comment {
            header.extend_from_slice(comment.as_bytes());
            header.push(0);
        }
        writer.write_all(&header)?;

        let mut compress = Compress::new(Compression::new(level), false);
        let block_size = pipelined_block_size(data.len(), 1, level);
        let mut output_buf = vec![0u8; block_size * 2];
        let mut crc_hasher = Hasher::new();

        let blocks: Vec<&[u8]> = data.chunks(block_size).collect();

        for (i, block) in blocks.iter().enumerate() {
            crc_hasher.update(block);

            // Set dictionary from previous block
            if i > 0 {
                let prev = blocks[i - 1];
                let dict = if prev.len() > DICT_SIZE {
                    &prev[prev.len() - DICT_SIZE..]
                } else {
                    prev
                };
                let _ = compress.set_dictionary(dict);
            }

            let flush = if i == blocks.len() - 1 {
                FlushCompress::Finish
            } else {
                FlushCompress::Sync
            };

            let mut block_data = *block;
            loop {
                let before_in = compress.total_in();
                let before_out = compress.total_out();
                let status = compress.compress(block_data, &mut output_buf, flush)?;
                let consumed = (compress.total_in() - before_in) as usize;
                let produced = (compress.total_out() - before_out) as usize;

                if produced > 0 {
                    writer.write_all(&output_buf[..produced])?;
                }

                block_data = &block_data[consumed..];

                match status {
                    Status::Ok if block_data.is_empty() && flush != FlushCompress::Finish => break,
                    Status::BufError if produced == 0 => break,
                    Status::StreamEnd => break,
                    _ => {}
                }
            }
        }

        // Write trailer
        let crc = crc_hasher.finalize();
        writer.write_all(&crc.to_le_bytes())?;
        writer.write_all(&(data.len() as u32).to_le_bytes())?;

        Ok(())
    }
}

/// Compress a single block with optional dictionary
///
/// Uses thread-local Compress object to avoid per-block allocation.
fn compress_block_with_dict(
    block: &[u8],
    dict: Option<&[u8]>,
    level: u32,
    block_size: usize,
    is_last: bool,
    output: &mut Vec<u8>,
) {
    PIPELINED_COMPRESS.with(|comp_cell| {
        let mut comp_opt = comp_cell.borrow_mut();

        output.clear();

        // Ensure buffer is large enough for worst case (incompressible data).
        // Compressed output can be slightly larger than input for random data.
        let initial_capacity = block_size + (block_size / 10) + 1024;
        if output.capacity() < initial_capacity {
            output.reserve(initial_capacity - output.capacity());
        }

        // Get or create Compress at the right level
        let compress = match comp_opt.as_mut() {
            Some((cached_level, comp)) if *cached_level == level => {
                comp.reset();
                comp
            }
            _ => {
                *comp_opt = Some((level, Compress::new(Compression::new(level), false)));
                &mut comp_opt.as_mut().unwrap().1
            }
        };

        // Set dictionary if provided (last 32KB of previous block's input)
        if let Some(d) = dict {
            // Only use last DICT_SIZE bytes
            let dict_slice = if d.len() > DICT_SIZE {
                &d[d.len() - DICT_SIZE..]
            } else {
                d
            };
            let _ = compress.set_dictionary(dict_slice);
        }

        let flush = if is_last {
            FlushCompress::Finish
        } else {
            FlushCompress::Sync
        };

        let mut input = block;

        loop {
            let before_in = compress.total_in();
            let status = compress
                .compress_vec(input, output, flush)
                .expect("compression failed");

            let consumed = (compress.total_in() - before_in) as usize;
            input = &input[consumed..];

            match status {
                Status::Ok if input.is_empty() && flush != FlushCompress::Finish => break,
                Status::BufError => {
                    // Need more output space (rare case for incompressible data)
                    let extra = output.capacity().max(1024);
                    output.reserve(extra);
                }
                Status::StreamEnd => break,
                _ => {}
            }
        }
    })
}

#[cfg(test)]
mod tests {
    use super::*;
    use flate2::read::GzDecoder;
    use std::io::Read;

    #[test]
    fn test_pipelined_compress() {
        let data = b"Hello, world! ".repeat(10000);
        let mut output = Vec::new();

        let encoder = PipelinedGzEncoder::new(9, 4);
        encoder
            .compress(std::io::Cursor::new(&data), &mut output)
            .unwrap();

        // Verify we can decompress
        let mut decoder = GzDecoder::new(&output[..]);
        let mut decompressed = Vec::new();
        decoder.read_to_end(&mut decompressed).unwrap();

        assert_eq!(decompressed, data);
    }

    #[test]
    fn test_pipelined_vs_parallel_size() {
        use crate::compress::parallel::ParallelGzEncoder;

        let data = b"The quick brown fox jumps over the lazy dog. ".repeat(5000);

        // Pipelined (with dictionary)
        let mut pipelined_output = Vec::new();
        let pipelined = PipelinedGzEncoder::new(9, 4);
        pipelined
            .compress(std::io::Cursor::new(&data), &mut pipelined_output)
            .unwrap();

        // Parallel (independent blocks)
        let mut parallel_output = Vec::new();
        let parallel = ParallelGzEncoder::new(9, 4);
        parallel
            .compress(std::io::Cursor::new(&data), &mut parallel_output)
            .unwrap();

        // Pipelined should be smaller (dictionary sharing)
        println!(
            "Pipelined: {} bytes, Parallel: {} bytes",
            pipelined_output.len(),
            parallel_output.len()
        );
        assert!(
            pipelined_output.len() <= parallel_output.len(),
            "Pipelined should produce smaller output"
        );
    }
}