gzippy 0.8.0

The fastest parallel gzip. Drop-in replacement for gzip and pigz, and a Rust library.
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
//! High-performance parallel gzip compression
//!
//! This module implements parallel compression using memory-mapped I/O and
//! our custom zero-overhead scheduler (no rayon).
//!
//! For large files, we use block-based parallel compression where each block
//! is a complete gzip member that can be concatenated.
//!
//! Key optimizations:
//! - Memory-mapped files for zero-copy access (no read_to_end latency)
//! - Custom scheduler with streaming output (no bulk collection)
//! - Thread-local buffer reuse to minimize allocations
//! - 128KB fixed blocks (matches pigz default)
//! - libdeflate for L1-L6 (30-50% faster than zlib-ng)
//! - BGZF-style block size markers in FEXTRA for fast parallel decompression

use crate::infra::scheduler::compress_parallel_independent;
use flate2::Compression;
use memmap2::Mmap;
use std::cell::RefCell;
use std::fs::File;
use std::io::{self, Read, Write};
use std::path::Path;

/// BGZF-style subfield ID for block size markers
/// Using "GZ" to identify gzippy-compressed blocks with embedded block sizes
pub const GZ_SUBFIELD_ID: [u8; 2] = [b'G', b'Z'];

/// Metadata for gzip header FNAME and MTIME fields
#[derive(Clone, Debug, Default)]
pub struct GzipHeaderInfo {
    /// Original filename (basename only) for FNAME field
    pub filename: Option<String>,
    /// File modification time as Unix timestamp for MTIME field
    pub mtime: u32,
    /// Optional comment for FCOMMENT field
    pub comment: Option<String>,
}

/// Adjust compression level for backend compatibility
///
/// For L1-L6, we use libdeflate which doesn't have zlib-ng's L1 RLE issue.
/// For L7-L9, we use zlib-ng which needs L1→L2 mapping.
///
/// However, since L7-L9 never uses L1, this mapping is only relevant for
/// the pipelined compressor which uses zlib-ng directly.
#[inline]
pub fn adjust_compression_level(level: u32) -> u32 {
    // Only needed for zlib-ng (pipelined compressor at L7-L9)
    // Parallel compressor uses libdeflate for L1-L6, no adjustment needed
    if level == 1 {
        2 // Map L1→L2 for zlib-ng (RLE produces 33% larger files)
    } else {
        level
    }
}

/// Get optimal block size based on compression level and file size
///
/// At lower levels (L1-L2), we use larger blocks to reduce per-block overhead
/// since compression is fast and synchronization becomes the bottleneck.
/// At higher levels (L3-L6), we use smaller blocks for better parallelization
/// since compression takes longer and can better utilize the parallelism.
///
/// Block size is also scaled based on file size to ensure enough blocks for
/// parallelism on small files while reducing overhead on large files.
#[inline]
pub fn get_block_size_for_level(level: u32) -> usize {
    match level {
        // L1-L2: Use 128KB blocks as baseline
        // This gives enough parallelism for small files
        // Note: 128KB > BGZF u16 limit, so BGZF markers will be disabled
        1 | 2 => 128 * 1024,
        // L3-L6: Use 64KB blocks - fits BGZF, enables parallel decompression
        3..=6 => DEFAULT_BLOCK_SIZE,
        // L7-L9: Handled by pipelined compressor (shouldn't reach here)
        7..=9 => 128 * 1024,
        // L10-L12: Ultra compression needs larger blocks for better context
        // libdeflate's exhaustive search benefits from more data to find matches
        // Use 512KB blocks - still enables parallelism but gives good ratio
        _ => 512 * 1024,
    }
}

/// Get optimal block size considering both level and file size
/// This ensures we have enough blocks for parallelism while keeping per-block
/// overhead low. At L1, libdeflate's hash table init is ~0.1ms per block,
/// so fewer larger blocks scale much better.
#[inline]
pub fn get_optimal_block_size(level: u32, file_size: usize, num_threads: usize) -> usize {
    let base_block_size = get_block_size_for_level(level);

    // For L1-L2, dynamically size blocks based on file size
    // Goal: ~4 blocks per thread for good load balancing with minimal overhead
    if level <= 2 {
        let target_blocks = (num_threads * 4).max(4);
        let dynamic_size = file_size / target_blocks;
        // With many threads, use smaller blocks for finer load balancing.
        // pigz uses 128KB; 256KB is a good balance of parallelism vs overhead.
        let max_size = if num_threads >= 8 {
            256 * 1024
        } else {
            4 * 1024 * 1024
        };
        let clamped = dynamic_size.clamp(256 * 1024, max_size);
        (clamped + 65535) & !65535
    } else {
        base_block_size
    }
}

// Thread-local compression buffer to avoid per-block allocation
thread_local! {
    static COMPRESS_BUF: RefCell<Vec<u8>> = const { RefCell::new(Vec::new()) };
    // Cache libdeflate Compressor by level to avoid per-block allocation
    // Tuple is (level, compressor) - we only cache one level per thread
    static LIBDEFLATE_COMPRESSOR: RefCell<Option<(i32, libdeflater::Compressor)>> = const { RefCell::new(None) };
}

/// Default block size for parallel compression
/// BGZF format stores block size as u16, so max is 65535 bytes
/// We use 64KB to stay within this limit while maximizing parallelism
const DEFAULT_BLOCK_SIZE: usize = 64 * 1024;

/// Parallel gzip compression using custom scheduler
pub struct ParallelGzEncoder {
    compression_level: u32,
    num_threads: usize,
    header_info: GzipHeaderInfo,
}

impl ParallelGzEncoder {
    pub fn new(compression_level: u32, num_threads: usize) -> Self {
        Self {
            compression_level,
            num_threads,
            header_info: GzipHeaderInfo::default(),
        }
    }

    pub fn set_header_info(&mut self, info: GzipHeaderInfo) {
        self.header_info = info;
    }

    /// Compress data in parallel and write to output
    pub fn compress<R: Read, W: Write + Send>(&self, mut reader: R, writer: W) -> io::Result<u64> {
        // Read all input data
        let mut input_data = Vec::new();
        let bytes_read = reader.read_to_end(&mut input_data)? as u64;

        if input_data.is_empty() {
            // Write empty gzip file - use level 9 for zlib (L10+ aren't supported by zlib)
            let zlib_level = self.compression_level.min(9);
            let encoder = self.gz_builder().write(
                writer,
                Compression::new(adjust_compression_level(zlib_level)),
            );
            encoder.finish()?;
            return Ok(0);
        }

        // Calculate optimal block size
        let block_size = self.calculate_block_size(input_data.len());

        // For small files or single thread, choose the fastest single-member path
        // For L10-L12, always use libdeflate blocks for better compression
        if self.compression_level <= 9 && (input_data.len() <= block_size || self.num_threads == 1)
        {
            let mut w = writer;
            return self
                .compress_single_stream(&input_data, &mut w)
                .map(|_| bytes_read);
        }

        // Large file with multiple threads, or L10-L12: compress blocks in parallel with libdeflate
        // Note: compress_parallel() has its own ratio probe that falls back to the
        // fastest streaming backend (ISA-L on x86, flate2 on arm64) for highly
        // compressible data where parallelism overhead exceeds benefit.
        let _ = self.compress_parallel(&input_data, block_size, writer)?;

        Ok(bytes_read)
    }

    /// Compress a pre-read buffer directly, avoiding the extra copy from Reader→Vec.
    /// Used by compress_stdin for zero-copy parallel compression.
    pub fn compress_buffer<W: Write + Send>(&self, data: &[u8], writer: W) -> io::Result<u64> {
        if data.is_empty() {
            let zlib_level = self.compression_level.min(9);
            let encoder = self.gz_builder().write(
                writer,
                Compression::new(adjust_compression_level(zlib_level)),
            );
            encoder.finish()?;
            return Ok(0);
        }

        let block_size = self.calculate_block_size(data.len());

        if self.compression_level <= 9 && (data.len() <= block_size || self.num_threads == 1) {
            let mut w = writer;
            return self
                .compress_single_stream(data, &mut w)
                .map(|_| data.len() as u64);
        }

        let _ = self.compress_parallel(data, block_size, writer)?;
        Ok(data.len() as u64)
    }

    /// Build a flate2 GzBuilder with FNAME/MTIME/FCOMMENT from header_info
    fn gz_builder(&self) -> flate2::GzBuilder {
        let mut builder = flate2::GzBuilder::new();
        if let Some(ref name) = self.header_info.filename {
            builder = builder.filename(name.as_bytes());
        }
        builder = builder.mtime(self.header_info.mtime);
        if let Some(ref comment) = self.header_info.comment {
            builder = builder.comment(comment.as_bytes());
        }
        builder
    }

    /// Calculate optimal block size based on file size and thread count
    fn calculate_block_size(&self, file_size: usize) -> usize {
        get_optimal_block_size(self.compression_level, file_size, self.num_threads)
    }

    /// Compress a file using memory-mapped I/O for zero-copy access
    pub fn compress_file<P: AsRef<Path>, W: Write + Send>(
        &self,
        path: P,
        mut writer: W,
    ) -> io::Result<u64> {
        let file = File::open(path)?;
        let file_len = file.metadata()?.len() as usize;

        if file_len == 0 {
            // Write empty gzip file - use level 9 for zlib (L10+ aren't supported by zlib)
            let zlib_level = self.compression_level.min(9);
            let encoder = self.gz_builder().write(
                &mut writer,
                Compression::new(adjust_compression_level(zlib_level)),
            );
            encoder.finish()?;
            return Ok(0);
        }

        // Memory-map the file for zero-copy access
        let mmap = unsafe { Mmap::map(&file)? };
        if self.num_threads > 1 {
            let _ = mmap.advise(memmap2::Advice::Sequential);
        }

        // For small files or single thread, choose the fastest single-member path
        // For L10-L12, always use libdeflate blocks for better compression
        let block_size = self.calculate_block_size(file_len);
        if self.compression_level <= 9 && (file_len <= block_size || self.num_threads == 1) {
            return self.compress_single_stream(&mmap, &mut writer);
        }

        // Large file with multiple threads, or L10-L12: compress blocks in parallel with libdeflate
        // Note: compress_parallel() has its own ratio probe that falls back to the
        // fastest streaming backend (ISA-L on x86, flate2 on arm64) for highly
        // compressible data where parallelism overhead exceeds benefit.
        let _ = self.compress_parallel(&mmap, block_size, writer)?;

        Ok(file_len as u64)
    }

    /// Single-stream compression using the best available backend
    fn compress_single_stream<W: Write>(&self, data: &[u8], writer: &mut W) -> io::Result<u64> {
        // For L0-L3, use ISA-L direct slice API (fastest on x86 with AVX2)
        if self.compression_level <= 3 && crate::backends::isal_compress::is_available() {
            return crate::backends::isal_compress::compress_gzip_to_writer(
                data,
                &mut *writer,
                self.compression_level,
            );
        }

        // For L1-L5, use libdeflate/ISA-L single member (faster than zlib-ng)
        if self.compression_level >= 1 && self.compression_level <= 5 {
            compress_single_member(
                &mut *writer,
                data,
                self.compression_level,
                &self.header_info,
            )?;
            return Ok(data.len() as u64);
        }

        // L6-L9: flate2/zlib-ng streaming (better ratio at higher levels)
        let mut encoder = self.gz_builder().write(
            &mut *writer,
            Compression::new(adjust_compression_level(self.compression_level)),
        );
        encoder.write_all(data)?;
        encoder.finish()?;
        Ok(data.len() as u64)
    }

    /// Parallel compression using custom scheduler with streaming output
    fn compress_parallel<W: Write + Send>(
        &self,
        data: &[u8],
        block_size: usize,
        writer: W,
    ) -> io::Result<W> {
        let compression_level = self.compression_level;
        let header_info = self.header_info.clone();

        // Note: We previously fell back to single-stream for highly compressible data
        // (ratio < 10%), but parallel BGZF blocks are always faster because:
        // - The bottleneck is CPU time per block, not I/O overhead
        // - BGZF header overhead is negligible (<0.01% of output)
        // - N threads processing N/num_threads of the data = N× throughput
        // The single-stream fallback was incorrect. Always use parallel blocks.

        // Use ISA-L for levels 0-3 when available (3-5x faster on x86)
        let use_isal = compression_level <= 3 && crate::backends::isal_compress::is_available();

        // Use custom scheduler - dedicated writer thread for max parallelism
        compress_parallel_independent(
            data,
            block_size,
            self.num_threads,
            writer,
            |block, output| {
                if use_isal {
                    compress_block_bgzf_isal(output, block, compression_level, &header_info);
                } else {
                    compress_block_bgzf_libdeflate(output, block, compression_level, &header_info);
                }
            },
        )
    }
}

/// Compress entire input as a single gzip member using the fastest available backend.
///
/// For L0-L3: Uses ISA-L (AVX2/NEON assembly) if available, otherwise libdeflate.
/// For L4-L12: Uses libdeflate.
///
/// The output includes a standard gzip header with FNAME/MTIME and a BGZF-compatible
/// block size marker in FEXTRA. This is valid gzip readable by any decompressor.
pub fn compress_single_member<W: Write>(
    writer: &mut W,
    input: &[u8],
    compression_level: u32,
    header_info: &GzipHeaderInfo,
) -> io::Result<u64> {
    if input.is_empty() {
        let encoder =
            flate2::GzBuilder::new().write(writer, Compression::new(compression_level.min(9)));
        encoder.finish()?;
        return Ok(0);
    }

    let bytes = input.len() as u64;
    let mut output = Vec::with_capacity(input.len());

    // Use ISA-L for L0-L3 if available (3-5x faster on x86 with AVX2)
    if compression_level <= 3 && crate::backends::isal_compress::is_available() {
        compress_block_bgzf_isal(&mut output, input, compression_level, header_info);
    } else {
        compress_block_bgzf_libdeflate(&mut output, input, compression_level, header_info);
    }

    writer.write_all(&output)?;
    Ok(bytes)
}

/// Compress a block with BGZF-style gzip header containing block size
///
/// The header includes:
/// - Standard gzip magic (0x1f 0x8b)
/// - FEXTRA flag set (0x04), optionally FNAME (0x08) and FCOMMENT (0x10)
/// - "GZ" subfield with compressed block size (allows parallel decompression)
/// - MTIME from file metadata (when available)
/// - Original filename (when available)
///
/// This is compatible with all gzip decompressors (they ignore unknown subfields)
/// but enables gzippy to find block boundaries without inflating.
///
/// Uses libdeflate for L1-L6 (faster, no dictionary needed).
pub fn compress_block_bgzf_libdeflate(
    output: &mut Vec<u8>,
    block: &[u8],
    compression_level: u32,
    header_info: &GzipHeaderInfo,
) {
    use libdeflater::{CompressionLvl, Compressor};

    output.clear();

    // Reserve space for header (we'll write block size later)
    let header_start = output.len();

    // Build flags: FEXTRA always set, optionally FNAME and FCOMMENT
    let mut flags: u8 = 0x04; // FEXTRA
    if header_info.filename.is_some() {
        flags |= 0x08; // FNAME
    }
    if header_info.comment.is_some() {
        flags |= 0x10; // FCOMMENT
    }

    // Write gzip header
    output.extend_from_slice(&[
        0x1f, 0x8b, // Magic
        0x08, // Compression method (deflate)
        flags,
    ]);
    output.extend_from_slice(&header_info.mtime.to_le_bytes()); // MTIME
    output.extend_from_slice(&[
        0x00, // XFL (no extra flags)
        0xff, // OS (unknown)
    ]);

    // FEXTRA: XLEN + subfield data
    // XLEN: 8 bytes (2 byte ID + 2 byte len + 4 byte block size)
    output.extend_from_slice(&[8, 0]);

    // Subfield: "GZ" + 2 bytes len + 4 bytes block size (placeholder)
    // Using 4 bytes allows block sizes up to 4GB
    output.extend_from_slice(&GZ_SUBFIELD_ID);
    output.extend_from_slice(&[4, 0]); // Subfield data length (4 bytes)
    let block_size_offset = output.len();
    output.extend_from_slice(&[0, 0, 0, 0]); // Placeholder for block size

    // FNAME (after FEXTRA, per RFC 1952 order)
    if let Some(ref name) = header_info.filename {
        output.extend_from_slice(name.as_bytes());
        output.push(0); // null terminator
    }

    // FCOMMENT (after FNAME)
    if let Some(ref comment) = header_info.comment {
        output.extend_from_slice(comment.as_bytes());
        output.push(0); // null terminator
    }

    // Get or create compressor from thread-local cache
    let level = compression_level as i32;
    let max_compressed_size = LIBDEFLATE_COMPRESSOR.with(|cache| {
        let mut cache = cache.borrow_mut();

        let compressor = match cache.as_mut() {
            Some((cached_level, comp)) if *cached_level == level => comp,
            _ => {
                let lvl = CompressionLvl::new(level).unwrap_or_default();
                *cache = Some((level, Compressor::new(lvl)));
                &mut cache.as_mut().unwrap().1
            }
        };

        compressor.deflate_compress_bound(block.len())
    });

    // Resize output buffer
    let deflate_start = output.len();
    output.resize(deflate_start + max_compressed_size, 0);

    // Do the actual compression
    let actual_len = LIBDEFLATE_COMPRESSOR.with(|cache| {
        let mut cache = cache.borrow_mut();
        let compressor = &mut cache.as_mut().unwrap().1;
        compressor
            .deflate_compress(block, &mut output[deflate_start..])
            .expect("libdeflate compression failed")
    });

    output.truncate(deflate_start + actual_len);

    // Compute CRC32 of uncompressed data
    let crc32 = crc32fast::hash(block);

    // Write gzip trailer: CRC32 + ISIZE (uncompressed size mod 2^32)
    output.extend_from_slice(&crc32.to_le_bytes());
    output.extend_from_slice(&(block.len() as u32).to_le_bytes());

    // Now write the total block size (including header and trailer)
    let total_block_size = output.len() - header_start;

    // Block size stored as u32 (no overflow possible for reasonable blocks)
    output[block_size_offset..block_size_offset + 4]
        .copy_from_slice(&(total_block_size as u32).to_le_bytes());
}

/// Compress a block using ISA-L for levels 0-3, with BGZF-style header.
/// Uses the same header format as compress_block_bgzf_libdeflate so blocks
/// from either compressor can be mixed.
/// Compresses directly into the output buffer to avoid per-block allocation.
fn compress_block_bgzf_isal(
    output: &mut Vec<u8>,
    block: &[u8],
    compression_level: u32,
    header_info: &GzipHeaderInfo,
) {
    output.clear();
    let header_start = output.len();

    let mut flags: u8 = 0x04; // FEXTRA
    if header_info.filename.is_some() {
        flags |= 0x08;
    }
    if header_info.comment.is_some() {
        flags |= 0x10;
    }

    output.extend_from_slice(&[0x1f, 0x8b, 0x08, flags]);
    output.extend_from_slice(&header_info.mtime.to_le_bytes());
    output.extend_from_slice(&[0x00, 0xff]);

    // FEXTRA: 8 bytes (2 ID + 2 len + 4 block size)
    output.extend_from_slice(&[8, 0]);
    output.extend_from_slice(&GZ_SUBFIELD_ID);
    output.extend_from_slice(&[4, 0]);
    let block_size_offset = output.len();
    output.extend_from_slice(&[0, 0, 0, 0]); // placeholder

    if let Some(ref name) = header_info.filename {
        output.extend_from_slice(name.as_bytes());
        output.push(0);
    }
    if let Some(ref comment) = header_info.comment {
        output.extend_from_slice(comment.as_bytes());
        output.push(0);
    }

    // Compress deflate data directly into output buffer (no intermediate alloc)
    let deflate_start = output.len();
    let max_compressed = block.len() + block.len() / 10 + 256;
    let needed = deflate_start + max_compressed;
    // SAFETY: compress_deflate_into only writes to output[deflate_start..], never reads
    // uninitialized bytes. We truncate to the actual written length immediately after.
    // Using set_len avoids zeroing ~300KB per block in the hot compress loop.
    #[allow(clippy::uninit_vec)]
    {
        output.reserve(needed.saturating_sub(output.len()));
        unsafe { output.set_len(needed) };
    }

    let compressed_len = crate::backends::isal_compress::compress_deflate_into(
        block,
        &mut output[deflate_start..],
        compression_level,
    );

    match compressed_len {
        Some(actual_len) => {
            output.truncate(deflate_start + actual_len);
        }
        None => {
            if std::env::var("GZIPPY_DEBUG").is_ok() {
                eprintln!(
                    "[gzippy] WARNING: ISA-L compress failed on {} byte block, using libdeflate",
                    block.len()
                );
            }
            compress_block_bgzf_libdeflate(output, block, compression_level, header_info);
            return;
        }
    }

    let crc32 = crc32fast::hash(block);
    output.extend_from_slice(&crc32.to_le_bytes());
    output.extend_from_slice(&(block.len() as u32).to_le_bytes());

    let total_block_size = output.len() - header_start;
    output[block_size_offset..block_size_offset + 4]
        .copy_from_slice(&(total_block_size as u32).to_le_bytes());
}

/// Split data into rsyncable blocks using a rolling hash.
/// Block boundaries are determined by content, so small input changes
/// only affect nearby blocks — ideal for rsync workflows.
///
/// Uses a simple Adler-style rolling hash with a window of 8KB.
/// When the hash's low bits match a trigger mask, a block boundary is created.
/// Target block size is ~128KB (mask = 0x1FFFF = 128K-1).
pub fn split_rsyncable(data: &[u8]) -> Vec<&[u8]> {
    const WINDOW: usize = 8192;
    const MASK: u32 = 0x1FFFF; // ~128KB average block size
    const MIN_BLOCK: usize = 32 * 1024; // 32KB minimum
    const MAX_BLOCK: usize = 512 * 1024; // 512KB maximum

    if data.len() <= MIN_BLOCK {
        return vec![data];
    }

    let mut blocks = Vec::new();
    let mut block_start = 0;
    let mut hash: u32 = 0;

    for i in 0..data.len() {
        // Add new byte to hash
        hash = hash.wrapping_add(data[i] as u32);

        // Remove byte leaving the window
        if i >= WINDOW {
            hash = hash.wrapping_sub(data[i - WINDOW] as u32);
        }

        let block_len = i - block_start + 1;

        // Check for boundary: hash hits trigger AND block is big enough
        if block_len >= MIN_BLOCK && (hash & MASK == MASK || block_len >= MAX_BLOCK) {
            blocks.push(&data[block_start..block_start + block_len]);
            block_start += block_len;
        }
    }

    // Last block
    if block_start < data.len() {
        blocks.push(&data[block_start..]);
    }

    blocks
}

/// Compress data with rsyncable block boundaries.
/// Each content-determined block becomes an independent gzip member.
pub fn compress_rsyncable<W: Write + Send>(
    data: &[u8],
    compression_level: u32,
    num_threads: usize,
    header_info: &GzipHeaderInfo,
    mut writer: W,
) -> io::Result<u64> {
    let blocks = split_rsyncable(data);

    if blocks.is_empty() {
        return Ok(0);
    }

    // For single block or single thread, compress sequentially
    if blocks.len() == 1 || num_threads <= 1 {
        let mut total = 0u64;
        for block in &blocks {
            let mut output = Vec::new();
            compress_block_bgzf_libdeflate(&mut output, block, compression_level, header_info);
            writer.write_all(&output)?;
            total += block.len() as u64;
        }
        return Ok(total);
    }

    // Parallel: compress blocks using thread pool
    use std::sync::atomic::{AtomicUsize, Ordering};
    use std::thread;

    let num_blocks = blocks.len();
    let next_block = AtomicUsize::new(0);

    // Pre-allocate output slots
    let outputs: Vec<std::sync::Mutex<Vec<u8>>> = (0..num_blocks)
        .map(|_| std::sync::Mutex::new(Vec::new()))
        .collect();

    thread::scope(|scope| {
        for _ in 0..num_threads.min(num_blocks) {
            scope.spawn(|| loop {
                let idx = next_block.fetch_add(1, Ordering::Relaxed);
                if idx >= num_blocks {
                    break;
                }
                let mut output = outputs[idx].lock().unwrap();
                compress_block_bgzf_libdeflate(
                    &mut output,
                    blocks[idx],
                    compression_level,
                    header_info,
                );
            });
        }
    });

    // Write outputs in order
    let mut total = 0u64;
    for (i, slot) in outputs.iter().enumerate() {
        let output = slot.lock().unwrap();
        writer.write_all(&output)?;
        total += blocks[i].len() as u64;
    }

    Ok(total)
}

#[cfg(test)]
mod tests {
    use super::*;
    use std::io::Cursor;

    #[test]
    fn test_parallel_compress_small() {
        let data = b"Hello, world!";
        let encoder = ParallelGzEncoder::new(6, 4);

        let mut output = Vec::new();
        encoder
            .compress(Cursor::new(&data[..]), &mut output)
            .unwrap();

        // Verify output is valid gzip by decompressing
        let mut decoder = flate2::read::GzDecoder::new(&output[..]);
        let mut decompressed = Vec::new();
        decoder.read_to_end(&mut decompressed).unwrap();

        assert_eq!(data.as_slice(), decompressed.as_slice());
    }

    #[test]
    fn test_parallel_compress_large() {
        let data = b"Hello, world! ".repeat(100000); // ~1.4MB
        let encoder = ParallelGzEncoder::new(6, 4);

        let mut output = Vec::new();
        encoder.compress(Cursor::new(&data), &mut output).unwrap();

        // Verify output is valid gzip by decompressing
        // Note: flate2's GzDecoder handles concatenated gzip members
        let mut decoder = flate2::read::MultiGzDecoder::new(&output[..]);
        let mut decompressed = Vec::new();
        decoder.read_to_end(&mut decompressed).unwrap();

        assert_eq!(data.as_slice(), decompressed.as_slice());
    }

    #[test]
    #[cfg(feature = "isal-compression")]
    fn test_bgzf_isal_block_roundtrip() {
        let data: Vec<u8> = (0..200_000).map(|i| (i % 256) as u8).collect();
        let header = GzipHeaderInfo::default();
        let mut output = Vec::new();

        compress_block_bgzf_isal(&mut output, &data, 1, &header);

        // Must be valid gzip
        assert!(output.len() >= 18, "output too short");
        assert_eq!(output[0], 0x1f, "bad gzip magic byte 0");
        assert_eq!(output[1], 0x8b, "bad gzip magic byte 1");

        // Must decompress correctly
        let mut decoder = flate2::read::GzDecoder::new(&output[..]);
        let mut decompressed = Vec::new();
        decoder.read_to_end(&mut decompressed).unwrap();
        assert_eq!(
            decompressed, data,
            "ISA-L BGZF block must roundtrip correctly"
        );
    }

    /// Verification: Check if libdeflate and zlib-ng produce equivalent gzip output
    ///
    /// The benchmark above uses raw DEFLATE, but gzippy uses gzip format with
    /// headers. This test verifies both backends produce valid, decompressible gzip.
    #[test]
    fn test_libdeflate_vs_zlib_ng_gzip_roundtrip() {
        use flate2::write::GzEncoder;
        use flate2::Compression;
        use std::io::Write;

        let data = b"Hello, world! ".repeat(1000); // 14KB test data

        // Compress with libdeflate L1 using compress_block_bgzf_libdeflate
        let header = GzipHeaderInfo::default();
        let mut output_libdeflate = Vec::new();
        compress_block_bgzf_libdeflate(&mut output_libdeflate, &data, 1, &header);

        // Compress with zlib-ng L1 using flate2
        let mut output_zlib_ng = Vec::new();
        let encoder = GzEncoder::new(&mut output_zlib_ng, Compression::new(1));
        let mut w = std::io::BufWriter::new(encoder);
        w.write_all(&data).expect("write failed");
        drop(w); // Ensure encoder is flushed

        // Both should decompress to original data
        let mut decoder1 = flate2::read::GzDecoder::new(&output_libdeflate[..]);
        let mut decompressed1 = Vec::new();
        decoder1.read_to_end(&mut decompressed1).unwrap();

        let mut decoder2 = flate2::read::GzDecoder::new(&output_zlib_ng[..]);
        let mut decompressed2 = Vec::new();
        decoder2.read_to_end(&mut decompressed2).unwrap();

        // Both must decompress to original
        assert_eq!(decompressed1, data, "libdeflate L1 roundtrip failed");
        assert_eq!(decompressed2, data, "zlib-ng L1 roundtrip failed");

        println!(
            "\nGzip roundtrip test passed:\n  libdeflate L1: {} bytes -> {:.2}% ratio\n  zlib-ng L1: {} bytes -> {:.2}% ratio",
            output_libdeflate.len(),
            (output_libdeflate.len() as f64 / data.len() as f64) * 100.0,
            output_zlib_ng.len(),
            (output_zlib_ng.len() as f64 / data.len() as f64) * 100.0
        );
    }

    /// Benchmark: libdeflate L1 vs zlib-ng L1 compression on arm64
    ///
    /// This test compares compression throughput (MB/s) of:
    /// - libdeflate at level 1 (current gzippy choice for Tmax)
    /// - zlib-ng at level 1 (via flate2, alternative)
    ///
    /// Run with: cargo test --release -- --ignored bench_libdeflate_vs_zlib_ng_l1 --nocapture
    #[ignore]
    #[test]
    fn bench_libdeflate_vs_zlib_ng_l1() {
        use std::fs;
        use std::time::Instant;

        // Load test data (silesia.tar = 202MB uncompressed, realistic workload)
        let test_file = "benchmark_data/silesia.tar";
        if !std::path::Path::new(test_file).exists() {
            eprintln!("SKIP: benchmark_data/silesia.tar not found");
            return;
        }

        let input = fs::read(test_file).expect("failed to read test file");
        let input_size_mb = input.len() as f64 / 1024.0 / 1024.0;
        println!("\nInput size: {:.2} MB", input_size_mb);
        println!("Running 3 iterations of each compressor...\n");

        // Benchmark libdeflate L1
        let mut libdeflate_times = Vec::new();
        let mut libdeflate_sizes = Vec::new();

        println!("=== libdeflate L1 ===");
        for run in 1..=3 {
            let output_size = LIBDEFLATE_COMPRESSOR.with(|cache| {
                let mut cache = cache.borrow_mut();
                let level = 1i32;
                let compressor = match cache.as_mut() {
                    Some((cached_level, comp)) if *cached_level == level => comp,
                    _ => {
                        use libdeflater::CompressionLvl;
                        let lvl = CompressionLvl::new(level).unwrap_or_default();
                        *cache = Some((level, libdeflater::Compressor::new(lvl)));
                        &mut cache.as_mut().unwrap().1
                    }
                };

                let max_len = compressor.deflate_compress_bound(input.len());
                let mut output = vec![0u8; max_len];

                let start = Instant::now();
                let actual_len = compressor
                    .deflate_compress(&input, &mut output)
                    .expect("libdeflate compression failed");
                let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;

                libdeflate_times.push(elapsed_ms);
                actual_len
            });
            libdeflate_sizes.push(output_size);

            let throughput = input_size_mb / (libdeflate_times.last().unwrap() / 1000.0);
            let ratio = (output_size as f64 / input.len() as f64) * 100.0;
            println!(
                "  Run {}: {:.2} MB/s (ratio: {:.2}%, compressed: {:.2} MB)",
                run,
                throughput,
                ratio,
                output_size as f64 / 1024.0 / 1024.0
            );
        }

        let libdeflate_avg_ms = libdeflate_times.iter().sum::<f64>() / 3.0;
        let libdeflate_throughput = input_size_mb / (libdeflate_avg_ms / 1000.0);

        // Benchmark zlib-ng L1 (via flate2)
        let mut zlib_ng_times = Vec::new();
        let mut zlib_ng_sizes = Vec::new();

        println!("\n=== zlib-ng L1 (flate2) ===");
        for run in 1..=3 {
            use flate2::write::DeflateEncoder;
            use flate2::Compression;
            use std::io::Write;

            let mut output = Vec::new();
            let mut encoder = DeflateEncoder::new(&mut output, Compression::new(1));

            let start = Instant::now();
            encoder.write_all(&input).expect("zlib-ng write failed");
            encoder.finish().expect("zlib-ng finish failed");
            let elapsed_ms = start.elapsed().as_secs_f64() * 1000.0;

            zlib_ng_times.push(elapsed_ms);
            zlib_ng_sizes.push(output.len());

            let throughput = input_size_mb / (zlib_ng_times.last().unwrap() / 1000.0);
            let ratio = (output.len() as f64 / input.len() as f64) * 100.0;
            println!(
                "  Run {}: {:.2} MB/s (ratio: {:.2}%, compressed: {:.2} MB)",
                run,
                throughput,
                ratio,
                output.len() as f64 / 1024.0 / 1024.0
            );
        }

        let zlib_ng_avg_ms = zlib_ng_times.iter().sum::<f64>() / 3.0;
        let zlib_ng_throughput = input_size_mb / (zlib_ng_avg_ms / 1000.0);

        // Report results
        println!("\n=== RESULTS ===");
        println!(
            "libdeflate L1: {:.2} MB/s (avg compression: {:.2}%)",
            libdeflate_throughput,
            (libdeflate_sizes[0] as f64 / input.len() as f64) * 100.0
        );
        println!(
            "zlib-ng L1:    {:.2} MB/s (avg compression: {:.2}%)",
            zlib_ng_throughput,
            (zlib_ng_sizes[0] as f64 / input.len() as f64) * 100.0
        );

        let diff_percent =
            ((zlib_ng_throughput - libdeflate_throughput) / libdeflate_throughput) * 100.0;
        println!("Difference:    {:.2}%", diff_percent);

        if diff_percent > 2.0 {
            println!(
                "\nRESULT: zlib-ng L1 is FASTER by {:.2}%. But compression ratio is worse.",
                diff_percent
            );
        } else if diff_percent < -2.0 {
            println!(
                "\nRESULT: libdeflate L1 is FASTER by {:.2}%. Keep libdeflate.",
                -diff_percent
            );
        } else {
            println!("\nRESULT: Performance within noise (±2%). Keep current libdeflate.");
        }
    }
}