rebgzf 0.2.0

Efficient gzip to BGZF transcoder using Puffin-style half-decompression
Documentation
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
//! Parallel DEFLATE decode transcoder using pugz-style block scanning.
//!
//! Pipeline:
//! 1. Parse gzip header, compute DEFLATE region
//! 2. Divide region into N byte-offset chunks
//! 3. Phase 1 (parallel): Scan for DEFLATE block boundaries at chunk starts
//! 4. Phase 2 (parallel): Each thread decodes Huffman symbols from its region,
//!    emitting raw LZ77 tokens (Literal, Copy, EndOfBlock) without a sliding window
//! 5. Phase 3 (sequential): Feed all tokens through BoundaryResolver to resolve
//!    cross-boundary references, then encode and emit BGZF blocks

use std::collections::BTreeMap;
use std::io::{BufWriter, Write};
use std::sync::{Arc, Condvar, Mutex};

use crossbeam::channel::{bounded, Receiver, Sender};

use super::block_scanner::scan_for_block;
use super::boundary::BoundaryResolver;
use super::encoding::{
    buffer_and_write_block, encoding_worker, send_job_and_drain, write_single_block, EncodedBlock,
    EncodingJob,
};
use super::single::{parse_gzip_header_size, SingleThreadedTranscoder};
use super::splitter::{BlockSplitter, DefaultSplitter, FastqSplitter};
use crate::bgzf::{GziEntry, BGZF_EOF};
use crate::bits::{BitRead, SliceBitReader};
use crate::deflate::parser::parse_dynamic_huffman_tables;
use crate::deflate::tables::{DISTANCE_TABLE, LENGTH_TABLE};
use crate::deflate::LZ77Token;
use crate::error::{Error, Result};
use crate::huffman::HuffmanDecoder;
use crate::{FormatProfile, TranscodeConfig, TranscodeStats};

/// Minimum DEFLATE region size (in bytes) to justify parallelism.
const MIN_REGION_BYTES: usize = 512 * 1024;

/// How far (in bytes) each thread scans looking for a valid DEFLATE block boundary.
const SCAN_WINDOW_BYTES: usize = 1024 * 1024;

/// Parallel DEFLATE decode transcoder.
pub struct ParallelDecodeTranscoder {
    config: TranscodeConfig,
    min_region_bytes: usize,
}

impl ParallelDecodeTranscoder {
    pub fn new(config: TranscodeConfig) -> Self {
        Self { config, min_region_bytes: MIN_REGION_BYTES }
    }

    /// Override the minimum DEFLATE region size required for parallelism (for testing).
    #[cfg(test)]
    fn with_min_region_bytes(mut self, min_region_bytes: usize) -> Self {
        self.min_region_bytes = min_region_bytes;
        self
    }

    /// Transcode from a memory-mapped gzip byte slice to a writer.
    ///
    /// Falls back to single-threaded for multi-member gzip files (detected by
    /// checking for a valid gzip header after the first member's trailer).
    pub fn transcode_mmap<W: Write>(&mut self, data: &[u8], output: W) -> Result<TranscodeStats> {
        let header_size = parse_gzip_header_size(data)?;
        let deflate_end = data.len().saturating_sub(8);
        let num_threads = self.config.effective_threads();

        let region = deflate_end.saturating_sub(header_size);
        if region < self.min_region_bytes || num_threads <= 1 {
            return self.fallback(data, output);
        }

        // Detect multi-member gzip: check if there's a valid gzip header
        // after the first member's DEFLATE stream + 8-byte trailer.
        // This is cheaper than scanning the whole file for 1f 8b magic bytes
        // and avoids false positives from DEFLATE-compressed data.
        if is_multi_member(data, header_size) {
            return self.fallback(data, output);
        }

        let chunk_size = region / num_threads;
        self.scan_and_decode_streaming(
            data,
            header_size,
            chunk_size,
            num_threads,
            deflate_end,
            output,
        )
    }

    /// Main thread work for Phase 3: resolve boundaries, dispatch to workers, write output.
    fn resolve_dispatch_write<W: Write>(
        &self,
        data: &[u8],
        chunk_tokens: impl IntoIterator<Item = Vec<LZ77Token>>,
        job_tx: Sender<EncodingJob>,
        result_rx: Receiver<Result<EncodedBlock>>,
        output: W,
    ) -> Result<TranscodeStats> {
        let mut writer = BufWriter::with_capacity(self.config.buffer_size, output);
        let mut resolver = BoundaryResolver::new();

        // Smart boundary splitting (matching single-threaded path)
        let use_smart = self.config.use_smart_boundaries();
        let mut splitter: Box<dyn BlockSplitter> =
            if use_smart && self.config.format == FormatProfile::Fastq {
                Box::new(FastqSplitter::new())
            } else {
                Box::new(DefaultSplitter)
            };
        let max_block_size = if use_smart {
            (self.config.block_size as f64 * 1.1) as usize
        } else {
            self.config.block_size
        };

        let mut pending_tokens: Vec<LZ77Token> = Vec::with_capacity(32768);
        let mut pending_uncompressed_size: usize = 0;
        let mut block_start_position: u64 = 0;
        let mut next_block_id: u64 = 0;

        // Output ordering state
        let build_index = self.config.build_index;
        let mut blocks_written: u64 = 0;
        let mut output_bytes: u64 = 0;
        let mut index_entries: Vec<GziEntry> = Vec::new();
        let mut current_compressed_offset: u64 = 0;
        let mut current_uncompressed_offset: u64 = 0;
        let mut pending_blocks: BTreeMap<u64, EncodedBlock> = BTreeMap::new();
        let mut next_write_id: u64 = 0;

        // Iterate all tokens from all chunks, accumulating into pending_tokens.
        // Use into_iter to take ownership (avoids clone).
        for chunk in chunk_tokens {
            for token in chunk {
                if matches!(token, LZ77Token::EndOfBlock) {
                    continue;
                }

                let token_size = token.uncompressed_size();
                splitter.process_token(&token);

                let should_emit = if use_smart {
                    let near_target =
                        pending_uncompressed_size + token_size >= self.config.block_size;
                    let at_good_split = splitter.is_good_split_point();
                    let exceeds_max = pending_uncompressed_size + token_size > max_block_size;
                    !pending_tokens.is_empty() && ((near_target && at_good_split) || exceeds_max)
                } else {
                    pending_uncompressed_size + token_size > self.config.block_size
                        && !pending_tokens.is_empty()
                };

                if should_emit {
                    let (resolved, crc, uncompressed_size) =
                        resolver.resolve_block(block_start_position, &pending_tokens);

                    let job = EncodingJob {
                        block_id: next_block_id,
                        tokens: resolved,
                        uncompressed_size,
                        crc,
                    };
                    next_block_id += 1;

                    send_job_and_drain(
                        &job_tx,
                        &result_rx,
                        job,
                        &mut writer,
                        &mut pending_blocks,
                        &mut next_write_id,
                        &mut blocks_written,
                        &mut output_bytes,
                        build_index,
                        &mut index_entries,
                        &mut current_compressed_offset,
                        &mut current_uncompressed_offset,
                    )?;

                    block_start_position = resolver.position();
                    pending_tokens.clear();
                    pending_uncompressed_size = 0;
                    splitter.reset();
                }

                pending_uncompressed_size += token_size;
                pending_tokens.push(token); // moved, not cloned
            }
        }

        // Flush remaining tokens (must use send_job_and_drain to avoid deadlock —
        // a blocking send here can deadlock if both channels are full and workers
        // are blocked on result_tx.send while the main thread blocks on job_tx.send)
        if !pending_tokens.is_empty() {
            let (resolved, crc, uncompressed_size) =
                resolver.resolve_block(block_start_position, &pending_tokens);
            let job =
                EncodingJob { block_id: next_block_id, tokens: resolved, uncompressed_size, crc };
            next_block_id += 1;
            send_job_and_drain(
                &job_tx,
                &result_rx,
                job,
                &mut writer,
                &mut pending_blocks,
                &mut next_write_id,
                &mut blocks_written,
                &mut output_bytes,
                build_index,
                &mut index_entries,
                &mut current_compressed_offset,
                &mut current_uncompressed_offset,
            )?;
        }

        // Signal workers to stop
        drop(job_tx);

        // Drain remaining results
        while blocks_written + (pending_blocks.len() as u64) < next_block_id {
            match result_rx.recv() {
                Ok(result) => {
                    let block = result?;
                    buffer_and_write_block(
                        &mut writer,
                        block,
                        &mut pending_blocks,
                        &mut next_write_id,
                        &mut blocks_written,
                        &mut output_bytes,
                        build_index,
                        &mut index_entries,
                        &mut current_compressed_offset,
                        &mut current_uncompressed_offset,
                    )?;
                }
                Err(_) => break,
            }
        }

        // Write any remaining buffered blocks
        while let Some(block) = pending_blocks.remove(&next_write_id) {
            write_single_block(
                &mut writer,
                &block.data,
                block.uncompressed_size,
                &mut output_bytes,
                build_index,
                &mut index_entries,
                &mut current_compressed_offset,
                &mut current_uncompressed_offset,
            )?;
            blocks_written += 1;
            next_write_id += 1;
        }

        // Write EOF
        writer.write_all(&BGZF_EOF).map_err(Error::Io)?;
        output_bytes += 28;
        writer.flush().map_err(Error::Io)?;

        let (resolved, _) = resolver.stats();

        Ok(TranscodeStats {
            input_bytes: data.len() as u64,
            output_bytes,
            blocks_written,
            boundary_refs_resolved: resolved,
            copied_directly: false,
            index_entries: if build_index { Some(index_entries) } else { None },
        })
    }

    /// Combined scan + decode for all chunks in parallel, with streaming handoff.
    ///
    /// Each thread (except thread 0) scans for candidate boundaries and tries
    /// decoding from each until one produces a substantial token count. Decoded
    /// tokens are deposited into shared slots and Phase 3 consumes them in order
    /// as they become available, so peak memory holds at most ~2 chunk token sets.
    ///
    /// **Note on chunk ownership**: Thread K's `stop_byte` is the nominal chunk boundary
    /// `header_size + (K+1) * chunk_size`, while thread K+1's start is its discovered
    /// DEFLATE block boundary (which may be slightly before or after that nominal position).
    /// In theory this can produce a small overlap or gap in decoded tokens. In practice,
    /// the sequential `BoundaryResolver` in Phase 3 processes all tokens in order and
    /// resolves cross-boundary LZ77 references correctly, so minor overlap is harmless.
    fn scan_and_decode_streaming<W: Write>(
        &self,
        data: &[u8],
        header_size: usize,
        chunk_size: usize,
        num_threads: usize,
        deflate_end: usize,
        output: W,
    ) -> Result<TranscodeStats> {
        // Shared slots for each chunk's decoded tokens.
        // Phase 2 threads deposit tokens; Phase 3 takes them in order.
        type ChunkSlot = Arc<(Mutex<Option<Vec<LZ77Token>>>, Condvar)>;
        let slots: Vec<ChunkSlot> =
            (0..num_threads).map(|_| Arc::new((Mutex::new(None), Condvar::new()))).collect();

        let encoding_threads = self.config.effective_threads();
        let channel_capacity = encoding_threads * 4;
        let use_fixed_huffman = self.config.use_fixed_huffman();

        let (job_tx, job_rx): (Sender<EncodingJob>, Receiver<EncodingJob>) =
            bounded(channel_capacity);
        let (result_tx, result_rx): (Sender<Result<EncodedBlock>>, Receiver<Result<EncodedBlock>>) =
            bounded(channel_capacity);

        let result = crossbeam::scope(|scope| {
            // Spawn Phase 2 decode threads
            for (k, slot) in slots.iter().enumerate() {
                let slot = Arc::clone(slot);
                let stop_byte = if k + 1 < num_threads {
                    header_size + (k + 1) * chunk_size
                } else {
                    deflate_end
                };

                scope.spawn(move |_| {
                    let tokens = if k == 0 {
                        let start_bit = header_size * 8;
                        let stop_bit = stop_byte * 8;
                        decode_chunk_tokens(data, start_bit, stop_bit).ok().unwrap_or_default()
                    } else {
                        scan_and_decode_chunk(
                            data,
                            header_size + k * chunk_size,
                            stop_byte,
                            deflate_end,
                        )
                        .map(|(_, t)| t)
                        .unwrap_or_default()
                    };

                    let (lock, cvar) = &*slot;
                    let mut guard = lock.lock().unwrap();
                    *guard = Some(tokens);
                    cvar.notify_one();
                });
            }

            // Spawn Phase 3 encoding workers
            for _ in 0..encoding_threads {
                let rx = job_rx.clone();
                let tx = result_tx.clone();
                scope.spawn(move |_| {
                    encoding_worker(rx, tx, use_fixed_huffman);
                });
            }
            drop(job_rx);
            drop(result_tx);

            // Main thread: consume chunk slots in order → resolve → dispatch → write.
            // Each chunk's tokens are taken and dropped after processing, so peak
            // memory holds at most ~2 chunk token sets (the one being resolved +
            // one being written by the encoding workers).
            let chunk_tokens_iter = slots.into_iter().filter_map(|slot| {
                let (lock, cvar) = &*slot;
                let mut guard = lock.lock().unwrap();
                while guard.is_none() {
                    guard = cvar.wait(guard).unwrap();
                }
                let tokens = guard.take().unwrap();
                if tokens.is_empty() {
                    None
                } else {
                    Some(tokens)
                }
            });

            self.resolve_dispatch_write(data, chunk_tokens_iter, job_tx, result_rx, output)
        });

        result.map_err(|_| Error::Internal("Phase 2/3 thread panicked".into()))?
    }

    fn fallback<W: Write>(&self, data: &[u8], output: W) -> Result<TranscodeStats> {
        let mut single = SingleThreadedTranscoder::new(self.config.clone());
        single.transcode_slice(data, output)
    }
}

/// Check if mmap'd data contains multiple gzip members by decoding the first
/// member's DEFLATE stream to find its end, then checking for another header.
///
/// This follows the actual DEFLATE structure rather than scanning for magic bytes
/// (which can appear inside compressed data as false positives).
fn is_multi_member(data: &[u8], header_size: usize) -> bool {
    // Decode the first member to find where BFINAL is, then check after trailer
    let start_bit = header_size * 8;
    let end_bit = data.len() * 8;

    let mut bits = SliceBitReader::new(data);
    let start_byte = start_bit / 8;
    let start_bit_offset = (start_bit % 8) as u8;
    bits.set_bit_position(start_byte, start_bit_offset);

    // Walk DEFLATE blocks until BFINAL
    loop {
        let (cur_byte, cur_bit) = bits.bit_position();
        let cur_abs_bit = cur_byte * 8 + cur_bit as usize;
        if cur_abs_bit >= end_bit {
            return false; // Ran out of data — single member
        }

        let bfinal = match bits.read_bits(1) {
            Ok(v) => v != 0,
            Err(_) => return false,
        };
        let btype = match bits.read_bits(2) {
            Ok(v) => v,
            Err(_) => return false,
        };

        match btype {
            0 => {
                // Stored block: skip LEN + NLEN + data
                bits.align_to_byte();
                let len = match bits.read_u16_le() {
                    Ok(v) => v,
                    Err(_) => return false,
                };
                let nlen = match bits.read_u16_le() {
                    Ok(v) => v,
                    Err(_) => return false,
                };
                if len != !nlen {
                    return false;
                }
                // Skip `len` bytes of stored data
                for _ in 0..len {
                    if bits.read_bits(8).is_err() {
                        return false;
                    }
                }
            }
            1 | 2 => {
                // Huffman block: must decode symbols to find EOB (symbol 256)
                let (lit_decoder, dist_decoder) = if btype == 1 {
                    (HuffmanDecoder::fixed_literal_length(), Some(HuffmanDecoder::fixed_distance()))
                } else {
                    match parse_dynamic_huffman_tables(&mut bits) {
                        Ok((lit, dist)) => (lit, dist),
                        Err(_) => return false,
                    }
                };

                // Decode symbols until EOB, discarding them
                loop {
                    let sym = match lit_decoder.decode(&mut bits) {
                        Ok(s) => s,
                        Err(_) => return false,
                    };
                    if sym == 256 {
                        break; // EOB
                    }
                    if sym > 256 {
                        // Length code: skip extra bits + distance code + extra bits
                        if sym > 285 {
                            return false;
                        }
                        let len_idx = (sym - 257) as usize;
                        let (_, extra_bits) = LENGTH_TABLE[len_idx];
                        if extra_bits > 0 && bits.read_bits(extra_bits).is_err() {
                            return false;
                        }
                        let dist_dec = match &dist_decoder {
                            Some(d) => d,
                            None => return false,
                        };
                        let dist_sym = match dist_dec.decode(&mut bits) {
                            Ok(s) => s,
                            Err(_) => return false,
                        };
                        if dist_sym > 29 {
                            return false;
                        }
                        let (_, dist_extra) = DISTANCE_TABLE[dist_sym as usize];
                        if dist_extra > 0 && bits.read_bits(dist_extra).is_err() {
                            return false;
                        }
                    }
                }
            }
            _ => return false, // Reserved block type
        }

        if bfinal {
            // Found end of first member's DEFLATE stream.
            // Align to byte boundary, skip 8-byte trailer (CRC32 + ISIZE).
            bits.align_to_byte();
            let (trailer_byte, _) = bits.bit_position();
            let after_trailer = trailer_byte + 8;

            // Check if a valid gzip header follows
            if after_trailer + 10 <= data.len() {
                return parse_gzip_header_size(&data[after_trailer..]).is_ok();
            }
            return false; // No room for another member
        }
    }
}

/// Minimum tokens a probe decode must produce to accept a candidate boundary.
const MIN_PROBE_TOKENS: usize = 1000;

/// How far (in bytes) to probe-decode before deciding if a candidate is valid.
/// 64KB of compressed data should produce ~30-100K tokens if the boundary is real.
const PROBE_BYTES: usize = 64 * 1024;

/// Scan for candidate boundaries and try decoding from each (rapidgzip-style).
///
/// For each structural candidate:
/// 1. Probe-decode up to PROBE_BYTES of compressed data
/// 2. If probe produces >= MIN_PROBE_TOKENS → candidate is real, do full decode
/// 3. If probe fails or produces too few → false positive, try next candidate
///
/// This avoids spending 50-500ms decoding from false positive positions.
fn scan_and_decode_chunk(
    data: &[u8],
    scan_start_byte: usize,
    stop_byte: usize,
    deflate_end: usize,
) -> Option<(usize, Vec<LZ77Token>)> {
    let scan_end_byte = (scan_start_byte + SCAN_WINDOW_BYTES).min(deflate_end);
    let stop_bit = stop_byte * 8;

    let mut search_from = scan_start_byte * 8;
    let search_end = scan_end_byte * 8;

    while search_from < search_end {
        let candidate = match scan_for_block(data, search_from, search_end) {
            Some(b) => b,
            None => break,
        };

        // Stage 1: Probe decode — only decode up to PROBE_BYTES
        let probe_stop = (candidate.bit_offset + PROBE_BYTES * 8).min(stop_bit);
        let probe_ok = match decode_chunk_tokens(data, candidate.bit_offset, probe_stop) {
            Ok(tokens) => tokens.len() >= MIN_PROBE_TOKENS,
            Err(_) => false,
        };

        if probe_ok {
            // Stage 2: Full decode from this boundary to chunk end
            match decode_chunk_tokens(data, candidate.bit_offset, stop_bit) {
                Ok(tokens) if !tokens.is_empty() => {
                    return Some((candidate.bit_offset, tokens));
                }
                _ => {}
            }
        }

        search_from = candidate.bit_offset + 1;
    }

    None
}

/// Phase 1: Find DEFLATE block boundaries for each chunk.
///
/// Thread 0 always starts at `header_size * 8` (the first DEFLATE block).
/// Threads K>0 scan for a valid block boundary near their chunk start.
/// If scanning fails, the boundary is set to `deflate_end * 8` so the predecessor
/// absorbs that chunk. Failed boundaries are filtered out before returning.
/// Decode DEFLATE blocks from `start_bit` to `stop_bit`, emitting raw LZ77 tokens.
///
/// This does NOT maintain a sliding window. Copy tokens are emitted as-is with their
/// length and distance values. The sequential BoundaryResolver in Phase 3 handles
/// all context resolution.
fn decode_chunk_tokens(data: &[u8], start_bit: usize, stop_bit: usize) -> Result<Vec<LZ77Token>> {
    let mut tokens = Vec::with_capacity(65536);
    let mut bits = SliceBitReader::new(data);

    let start_byte = start_bit / 8;
    let start_bit_offset = (start_bit % 8) as u8;
    bits.set_bit_position(start_byte, start_bit_offset);

    loop {
        // Check if we've reached or passed the stop position
        let (cur_byte, cur_bit) = bits.bit_position();
        let cur_abs_bit = cur_byte * 8 + cur_bit as usize;
        if cur_abs_bit >= stop_bit {
            break;
        }

        // Read BFINAL and BTYPE
        let bfinal = match bits.read_bits(1) {
            Ok(v) => v != 0,
            Err(_) => break,
        };
        let btype = match bits.read_bits(2) {
            Ok(v) => v,
            Err(_) => break,
        };

        match btype {
            0 => {
                // Stored block: align to byte, read LEN/NLEN, emit literals
                bits.align_to_byte();
                let len = match bits.read_u16_le() {
                    Ok(v) => v,
                    Err(_) => break,
                };
                let nlen = match bits.read_u16_le() {
                    Ok(v) => v,
                    Err(_) => break,
                };
                if len != !nlen {
                    break; // Invalid stored block
                }
                let mut stored_complete = true;
                for _ in 0..len {
                    match bits.read_bits(8) {
                        Ok(b) => tokens.push(LZ77Token::Literal(b as u8)),
                        Err(_) => {
                            // Incomplete stored block — stop decoding entirely.
                            // Near chunk boundaries this is expected; the sequential
                            // resolver handles any truncation.
                            stored_complete = false;
                            break;
                        }
                    }
                }
                if !stored_complete {
                    return Ok(tokens);
                }
                tokens.push(LZ77Token::EndOfBlock);
            }
            1 => {
                // Fixed Huffman
                let lit_decoder = HuffmanDecoder::fixed_literal_length();
                let dist_decoder = HuffmanDecoder::fixed_distance();
                decode_huffman_block(&mut bits, &lit_decoder, Some(&dist_decoder), &mut tokens)?;
            }
            2 => {
                // Dynamic Huffman
                let (lit_decoder, dist_decoder) = parse_dynamic_huffman_tables(&mut bits)?;
                decode_huffman_block(&mut bits, &lit_decoder, dist_decoder.as_ref(), &mut tokens)?;
            }
            _ => break, // Reserved block type
        }

        if bfinal {
            break;
        }
    }

    Ok(tokens)
}

/// Decode Huffman symbols from a single DEFLATE block, emitting LZ77 tokens.
/// No sliding window is maintained.
fn decode_huffman_block(
    bits: &mut SliceBitReader<'_>,
    lit_decoder: &HuffmanDecoder,
    dist_decoder: Option<&HuffmanDecoder>,
    tokens: &mut Vec<LZ77Token>,
) -> Result<()> {
    loop {
        let sym = lit_decoder.decode(bits)?;

        if sym <= 255 {
            tokens.push(LZ77Token::Literal(sym as u8));
            continue;
        }

        if sym == 256 {
            tokens.push(LZ77Token::EndOfBlock);
            break;
        }

        // Length code (257..=285)
        if sym > 285 {
            return Err(Error::InvalidLengthCode(sym));
        }

        let len_idx = (sym - 257) as usize;
        let (base_len, extra_bits) = LENGTH_TABLE[len_idx];
        let extra = if extra_bits > 0 { bits.read_bits(extra_bits)? } else { 0 };
        let length = base_len + extra as u16;

        // Read distance
        let dist_dec = dist_decoder.ok_or(Error::InvalidDistanceCode(0))?;
        let dist_sym = dist_dec.decode(bits)?;
        if dist_sym > 29 {
            return Err(Error::InvalidDistanceCode(dist_sym));
        }

        let (base_dist, dist_extra_bits) = DISTANCE_TABLE[dist_sym as usize];
        let dist_extra = if dist_extra_bits > 0 { bits.read_bits(dist_extra_bits)? } else { 0 };
        let distance = base_dist + dist_extra as u16;

        tokens.push(LZ77Token::Copy { length, distance });
    }

    Ok(())
}

#[cfg(test)]
mod tests {
    use super::*;

    fn make_fastq(num_reads: usize) -> Vec<u8> {
        let mut buf = Vec::with_capacity(num_reads * 80);
        for i in 0..num_reads {
            buf.extend_from_slice(
                format!(
                    "@SEQ_{i}\n\
                     ACGTACGTACGTACGTACGTACGTACGTACGT\n\
                     +\n\
                     IIIIIIIIIIIIIIIIIIIIIIIIIIIIIIII\n"
                )
                .as_bytes(),
            );
        }
        buf
    }

    fn gzip_compress(input: &[u8]) -> Vec<u8> {
        use std::io::Write as IoWrite;
        let mut enc = flate2::write::GzEncoder::new(Vec::new(), flate2::Compression::default());
        enc.write_all(input).unwrap();
        enc.finish().unwrap()
    }

    fn gzip_decompress(data: &[u8]) -> Vec<u8> {
        use std::io::Read as IoRead;
        let mut decoder = flate2::read::MultiGzDecoder::new(data);
        let mut output = Vec::new();
        decoder.read_to_end(&mut output).unwrap();
        output
    }

    #[test]
    fn test_parallel_roundtrip() {
        let original = make_fastq(10_000);
        let gz = gzip_compress(&original);

        let config = TranscodeConfig { num_threads: 4, ..Default::default() };
        let mut transcoder = ParallelDecodeTranscoder::new(config).with_min_region_bytes(0);

        let mut bgzf_output = Vec::new();
        let stats = transcoder.transcode_mmap(&gz, &mut bgzf_output).unwrap();

        assert!(stats.blocks_written >= 1);

        let decompressed = gzip_decompress(&bgzf_output);
        assert_eq!(decompressed, original);
    }

    #[test]
    fn test_parallel_matches_single_threaded() {
        let original = make_fastq(10_000);
        let gz = gzip_compress(&original);

        // Single-threaded
        let mut st = SingleThreadedTranscoder::new(TranscodeConfig::default());
        let mut st_output = Vec::new();
        st.transcode_slice(&gz, &mut st_output).unwrap();

        // Parallel (with min_region_bytes=0 to ensure parallel path is taken)
        let config = TranscodeConfig { num_threads: 4, ..Default::default() };
        let mut pd = ParallelDecodeTranscoder::new(config).with_min_region_bytes(0);
        let mut pd_output = Vec::new();
        pd.transcode_mmap(&gz, &mut pd_output).unwrap();

        let st_dec = gzip_decompress(&st_output);
        let pd_dec = gzip_decompress(&pd_output);
        assert_eq!(st_dec, pd_dec);
    }

    #[test]
    fn test_is_multi_member_single() {
        let gz = gzip_compress(b"hello world");
        let header_size = parse_gzip_header_size(&gz).unwrap();
        assert!(!is_multi_member(&gz, header_size));
    }

    #[test]
    fn test_is_multi_member_two_members() {
        let gz1 = gzip_compress(b"hello");
        let gz2 = gzip_compress(b"world");
        let mut concat = Vec::new();
        concat.extend_from_slice(&gz1);
        concat.extend_from_slice(&gz2);
        let header_size = parse_gzip_header_size(&concat).unwrap();
        assert!(is_multi_member(&concat, header_size));
    }

    #[test]
    fn test_multi_member_roundtrip() {
        // Multi-member should fall back to single-threaded and still produce correct output
        let data1 = make_fastq(500);
        let data2 = make_fastq(500);
        let mut concat_gz = Vec::new();
        concat_gz.extend_from_slice(&gzip_compress(&data1));
        concat_gz.extend_from_slice(&gzip_compress(&data2));

        let config = TranscodeConfig { num_threads: 4, ..Default::default() };
        let mut transcoder = ParallelDecodeTranscoder::new(config).with_min_region_bytes(0);

        let mut bgzf_output = Vec::new();
        transcoder.transcode_mmap(&concat_gz, &mut bgzf_output).unwrap();

        let decompressed = gzip_decompress(&bgzf_output);
        let mut expected = Vec::new();
        expected.extend_from_slice(&data1);
        expected.extend_from_slice(&data2);
        assert_eq!(decompressed, expected);
    }

    #[test]
    fn test_falls_back_for_small_input() {
        let original = make_fastq(10);
        let gz = gzip_compress(&original);

        let config = TranscodeConfig { num_threads: 4, ..Default::default() };
        let mut transcoder = ParallelDecodeTranscoder::new(config);

        let mut bgzf_output = Vec::new();
        let stats = transcoder.transcode_mmap(&gz, &mut bgzf_output).unwrap();

        assert!(stats.blocks_written >= 1);

        let decompressed = gzip_decompress(&bgzf_output);
        assert_eq!(decompressed, original);
    }
}