Skip to main content

oxiarc_deflate/
streaming.rs

1//! Streaming compression and decompression for GZIP and Zlib formats.
2//!
3//! Provides [`GzipStreamEncoder`] / [`GzipStreamDecoder`] and
4//! [`ZlibStreamEncoder`] / [`ZlibStreamDecoder`] that implement the standard
5//! [`std::io::Write`] and [`std::io::Read`] traits respectively, enabling
6//! incremental processing of compressed data through standard Rust I/O
7//! pipelines.
8//!
9//! # Design
10//!
11//! **Encoders** maintain a single GZIP/Zlib stream with a persistent
12//! [`Deflater`] and running CRC-32 (GZIP) or
13//! Adler-32 (Zlib) checksums. Flush methods (`sync_flush`, `full_flush`,
14//! `partial_flush`) emit DEFLATE blocks within the *same* stream rather
15//! than starting new concatenated members.
16//!
17//! When the internal buffer reaches `block_size` bytes it is automatically
18//! flushed via `sync_flush`. Any remaining data is flushed when
19//! [`finish`](GzipStreamEncoder::finish) is called.
20//!
21//! **Decoders** eagerly read all compressed data from the inner reader on the
22//! first `read` call, decompress it, and serve from an internal buffer. This
23//! matches the pattern used by `oxiarc-zstd`.
24//!
25//! # Example
26//!
27//! ```rust
28//! use std::io::{Read, Write};
29//! use oxiarc_deflate::streaming::{GzipStreamEncoder, GzipStreamDecoder};
30//!
31//! // Compress
32//! let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
33//! encoder.write_all(b"Hello, streaming gzip!").expect("write failed");
34//! let compressed = encoder.finish().expect("finish failed");
35//!
36//! // Decompress
37//! let mut decoder = GzipStreamDecoder::new(&compressed[..]);
38//! let mut output = String::new();
39//! decoder.read_to_string(&mut output).expect("read failed");
40//! assert_eq!(output, "Hello, streaming gzip!");
41//! ```
42
43use crate::deflate::Deflater;
44use crate::inflate::Inflater;
45use crate::zlib::{Adler32, zlib_decompress};
46use oxiarc_core::{BitReader, Crc32};
47use std::io::{self, Cursor, Read, Write};
48
49/// Default block size for incremental encoder flushing (128 KiB).
50const DEFAULT_BLOCK_SIZE: usize = 128 * 1024;
51
52// ---------------------------------------------------------------------------
53// GZIP constants
54// ---------------------------------------------------------------------------
55
56/// Gzip magic bytes.
57const GZIP_ID1: u8 = 0x1f;
58const GZIP_ID2: u8 = 0x8b;
59/// Compression method: deflate.
60const GZIP_CM_DEFLATE: u8 = 8;
61/// Gzip header flags byte (no extra fields).
62const GZIP_FLG_NONE: u8 = 0;
63/// OS byte: unknown (255).
64const GZIP_OS_UNKNOWN: u8 = 255;
65
66// ---------------------------------------------------------------------------
67// GzipStreamEncoder
68// ---------------------------------------------------------------------------
69
70/// Streaming GZIP encoder that implements [`Write`].
71///
72/// Maintains a single GZIP stream with a persistent [`Deflater`] and
73/// running CRC-32. Flush methods (`sync_flush`, `full_flush`,
74/// `partial_flush`) emit DEFLATE blocks within the same stream.
75///
76/// **Important:** you *must* call [`finish`](GzipStreamEncoder::finish) to
77/// write the GZIP trailer. Dropping the encoder without calling `finish`
78/// will produce an incomplete GZIP stream.
79pub struct GzipStreamEncoder<W: Write> {
80    /// The wrapped writer that receives compressed output.
81    inner: Option<W>,
82    /// Internal buffer holding uncompressed data waiting to be flushed.
83    buffer: Vec<u8>,
84    /// Persistent DEFLATE compressor.
85    deflater: Deflater,
86    /// Running CRC-32 over all uncompressed data.
87    crc: Crc32,
88    /// Total number of uncompressed bytes fed (mod 2^32 for ISIZE).
89    total_in: u64,
90    /// Whether the GZIP header has been written yet.
91    header_written: bool,
92    /// Whether `finish` has already been called.
93    finished: bool,
94    /// Threshold at which the buffer is automatically flushed.
95    block_size: usize,
96}
97
98impl<W: Write> GzipStreamEncoder<W> {
99    /// Create a new streaming GZIP encoder wrapping `writer`.
100    ///
101    /// The `level` parameter controls the DEFLATE compression level (0-9).
102    pub fn new(writer: W, level: u8) -> Self {
103        Self {
104            inner: Some(writer),
105            buffer: Vec::new(),
106            deflater: Deflater::new(level.min(9)),
107            crc: Crc32::new(),
108            total_in: 0,
109            header_written: false,
110            finished: false,
111            block_size: DEFAULT_BLOCK_SIZE,
112        }
113    }
114
115    /// Set the block size used for incremental flushing.
116    ///
117    /// When the internal buffer reaches this many bytes it is automatically
118    /// flushed via sync_flush.
119    pub fn with_block_size(mut self, block_size: usize) -> Self {
120        self.block_size = block_size.max(1);
121        self
122    }
123
124    /// Write the 10-byte GZIP header if not already written.
125    fn ensure_header(&mut self) -> io::Result<()> {
126        if self.header_written {
127            return Ok(());
128        }
129        let header = [
130            GZIP_ID1,
131            GZIP_ID2,
132            GZIP_CM_DEFLATE,
133            GZIP_FLG_NONE,
134            0,
135            0,
136            0,
137            0, // MTIME = 0
138            0, // XFL = 0
139            GZIP_OS_UNKNOWN,
140        ];
141        if let Some(ref mut w) = self.inner {
142            w.write_all(&header)?;
143        }
144        self.header_written = true;
145        Ok(())
146    }
147
148    /// Drain the internal buffer through the deflater using a sync flush.
149    ///
150    /// A sync flush emits a compressed block followed by an empty stored
151    /// block (`0x00 0x00 0xFF 0xFF`), which byte-aligns the stream and
152    /// allows a decoder to decode all data fed so far.
153    pub fn sync_flush(&mut self) -> io::Result<()> {
154        self.ensure_header()?;
155        let data = std::mem::take(&mut self.buffer);
156        self.crc.update(&data);
157        self.total_in += data.len() as u64;
158        let mut compressed = Vec::new();
159        self.deflater
160            .deflate_sync(&data, &mut compressed)
161            .map_err(|e| io::Error::other(e.to_string()))?;
162        if let Some(ref mut w) = self.inner {
163            w.write_all(&compressed)?;
164        }
165        Ok(())
166    }
167
168    /// Full flush: same as sync flush, then reset the LZ77 state.
169    ///
170    /// After a full flush the compressor state is reset so subsequent
171    /// data can be decompressed independently (given knowledge of the
172    /// block boundary).
173    pub fn full_flush(&mut self) -> io::Result<()> {
174        self.sync_flush()?;
175        self.deflater.reset_lz77();
176        Ok(())
177    }
178
179    /// Partial flush: emit a compressed block without the sync marker.
180    ///
181    /// The block is flushed to a byte boundary but no empty stored block
182    /// is appended. This produces slightly smaller output than sync flush
183    /// but the decoder cannot determine a safe decompression boundary.
184    pub fn partial_flush(&mut self) -> io::Result<()> {
185        self.ensure_header()?;
186        let data = std::mem::take(&mut self.buffer);
187        self.crc.update(&data);
188        self.total_in += data.len() as u64;
189        let mut compressed = Vec::new();
190        self.deflater
191            .deflate_partial(&data, &mut compressed)
192            .map_err(|e| io::Error::other(e.to_string()))?;
193        if let Some(ref mut w) = self.inner {
194            w.write_all(&compressed)?;
195        }
196        Ok(())
197    }
198
199    /// Finish compression and return the inner writer.
200    ///
201    /// This **must** be called to write the GZIP trailer (CRC-32 + ISIZE).
202    ///
203    /// # Errors
204    ///
205    /// Returns an [`io::Error`] if compression or writing to the inner writer
206    /// fails.
207    pub fn finish(mut self) -> io::Result<W> {
208        if !self.finished {
209            self.ensure_header()?;
210            // Flush any remaining buffered data as a final DEFLATE block.
211            let data = std::mem::take(&mut self.buffer);
212            self.crc.update(&data);
213            self.total_in += data.len() as u64;
214            let mut compressed = Vec::new();
215            self.deflater
216                .deflate(&data, &mut compressed, true)
217                .map_err(|e| io::Error::other(e.to_string()))?;
218            if let Some(ref mut w) = self.inner {
219                w.write_all(&compressed)?;
220            }
221            // Write trailer: CRC-32 (4 bytes LE) + ISIZE (4 bytes LE).
222            let crc_val = self.crc.clone().finalize();
223            let isize_val = (self.total_in & 0xFFFF_FFFF) as u32;
224            if let Some(ref mut w) = self.inner {
225                w.write_all(&crc_val.to_le_bytes())?;
226                w.write_all(&isize_val.to_le_bytes())?;
227            }
228            self.finished = true;
229        }
230        self.inner
231            .take()
232            .ok_or_else(|| io::Error::other("inner writer already taken"))
233    }
234
235    /// If the buffer has reached `block_size`, sync-flush it.
236    fn maybe_flush_block(&mut self) -> io::Result<()> {
237        if self.buffer.len() >= self.block_size {
238            self.sync_flush()?;
239        }
240        Ok(())
241    }
242
243    /// Returns the number of uncompressed bytes currently buffered.
244    pub fn buffered_bytes(&self) -> usize {
245        self.buffer.len()
246    }
247
248    /// Returns `true` if `finish` has already been called.
249    pub fn is_finished(&self) -> bool {
250        self.finished
251    }
252}
253
254impl<W: Write> Write for GzipStreamEncoder<W> {
255    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
256        if self.finished {
257            return Err(io::Error::other("encoder already finished"));
258        }
259        self.buffer.extend_from_slice(buf);
260        self.maybe_flush_block()?;
261        Ok(buf.len())
262    }
263
264    fn flush(&mut self) -> io::Result<()> {
265        if !self.buffer.is_empty() {
266            self.sync_flush()?;
267        }
268        if let Some(ref mut w) = self.inner {
269            w.flush()?;
270        }
271        Ok(())
272    }
273}
274
275// ---------------------------------------------------------------------------
276// GzipStreamDecoder
277// ---------------------------------------------------------------------------
278
279/// Streaming GZIP decoder that implements [`Read`].
280///
281/// All compressed data is read eagerly from the inner reader on the first
282/// `read` call, decompressed into an internal buffer, and then served from
283/// that buffer for subsequent reads.
284///
285/// Supports concatenated GZIP members: each member is decompressed
286/// independently and the results are concatenated.
287pub struct GzipStreamDecoder<R: Read> {
288    /// The wrapped reader providing compressed input.
289    inner: R,
290    /// Decompressed output buffer.
291    output_buffer: Vec<u8>,
292    /// Current read position inside `output_buffer`.
293    output_pos: usize,
294    /// Whether the compressed stream has been fully consumed.
295    finished: bool,
296}
297
298impl<R: Read> GzipStreamDecoder<R> {
299    /// Create a new streaming GZIP decoder wrapping `reader`.
300    pub fn new(reader: R) -> Self {
301        Self {
302            inner: reader,
303            output_buffer: Vec::new(),
304            output_pos: 0,
305            finished: false,
306        }
307    }
308
309    /// Consume the decoder and return the inner reader.
310    pub fn into_inner(self) -> R {
311        self.inner
312    }
313
314    /// Read and decompress all compressed data from the inner reader.
315    ///
316    /// Handles concatenated GZIP members correctly by tracking exact DEFLATE
317    /// block boundaries via the inflater's bit-level parser rather than
318    /// scanning for magic bytes in the compressed payload (which would produce
319    /// false-positive splits whenever `0x1F 0x8B` appears inside DEFLATE data).
320    ///
321    /// Each GZIP member is decoded as:
322    ///   1. 10-byte fixed header (plus optional variable-length fields)
323    ///   2. DEFLATE compressed data  — consumed by `Inflater::inflate_consumed`
324    ///   3. 8-byte trailer: CRC-32 (LE) + ISIZE (LE)
325    ///
326    /// A single `BitReader` is shared across all members so the stream position
327    /// is always exact and no bytes are lost between members.
328    fn fill_buffer(&mut self) -> io::Result<()> {
329        if self.finished || self.output_pos < self.output_buffer.len() {
330            return Ok(());
331        }
332
333        let mut compressed = Vec::new();
334        self.inner.read_to_end(&mut compressed)?;
335
336        if compressed.is_empty() {
337            self.finished = true;
338            return Ok(());
339        }
340
341        let cursor = Cursor::new(compressed);
342        let mut bit_reader = BitReader::new(cursor);
343        let mut all_decompressed = Vec::new();
344
345        loop {
346            // ── 1. Peek at the first two bytes to detect GZIP magic ──────────
347            let mut magic = [0u8; 2];
348            match bit_reader.read_bytes(&mut magic) {
349                Ok(()) => {}
350                Err(oxiarc_core::error::OxiArcError::Io(ref e))
351                    if e.kind() == io::ErrorKind::UnexpectedEof =>
352                {
353                    break;
354                }
355                Err(oxiarc_core::error::OxiArcError::UnexpectedEof { .. }) => break,
356                Err(e) => return Err(io::Error::new(io::ErrorKind::InvalidData, e.to_string())),
357            }
358
359            if magic[0] != GZIP_ID1 || magic[1] != GZIP_ID2 {
360                // Trailing non-GZIP data — stop gracefully.
361                break;
362            }
363
364            // ── 2. Read the rest of the fixed 10-byte header ────────────────
365            // We already consumed 2 bytes (ID1, ID2); read the remaining 8.
366            let mut header_rest = [0u8; 8];
367            bit_reader.read_bytes(&mut header_rest).map_err(|e| {
368                io::Error::new(
369                    io::ErrorKind::InvalidData,
370                    format!("gzip header truncated: {e}"),
371                )
372            })?;
373            let cm = header_rest[0]; // compression method (byte 2)
374            let flg = header_rest[1]; // flags (byte 3)
375            // bytes 2-5: MTIME (ignored), byte 6: XFL (ignored), byte 7: OS (ignored)
376
377            if cm != GZIP_CM_DEFLATE {
378                return Err(io::Error::new(
379                    io::ErrorKind::InvalidData,
380                    format!("unsupported gzip compression method: {cm}"),
381                ));
382            }
383
384            // ── 3. Skip optional header fields based on FLG ──────────────────
385            // FEXTRA (bit 2): 2-byte XLEN followed by XLEN bytes
386            if flg & 0x04 != 0 {
387                let mut xlen_buf = [0u8; 2];
388                bit_reader.read_bytes(&mut xlen_buf).map_err(|e| {
389                    io::Error::new(
390                        io::ErrorKind::InvalidData,
391                        format!("gzip FEXTRA truncated: {e}"),
392                    )
393                })?;
394                let xlen = u16::from_le_bytes(xlen_buf) as usize;
395                let mut extra = vec![0u8; xlen];
396                bit_reader.read_bytes(&mut extra).map_err(|e| {
397                    io::Error::new(
398                        io::ErrorKind::InvalidData,
399                        format!("gzip FEXTRA data truncated: {e}"),
400                    )
401                })?;
402            }
403
404            // FNAME (bit 3): null-terminated string
405            if flg & 0x08 != 0 {
406                let mut byte = [0u8; 1];
407                loop {
408                    bit_reader.read_bytes(&mut byte).map_err(|e| {
409                        io::Error::new(
410                            io::ErrorKind::InvalidData,
411                            format!("gzip FNAME truncated: {e}"),
412                        )
413                    })?;
414                    if byte[0] == 0 {
415                        break;
416                    }
417                }
418            }
419
420            // FCOMMENT (bit 4): null-terminated comment
421            if flg & 0x10 != 0 {
422                let mut byte = [0u8; 1];
423                loop {
424                    bit_reader.read_bytes(&mut byte).map_err(|e| {
425                        io::Error::new(
426                            io::ErrorKind::InvalidData,
427                            format!("gzip FCOMMENT truncated: {e}"),
428                        )
429                    })?;
430                    if byte[0] == 0 {
431                        break;
432                    }
433                }
434            }
435
436            // FHCRC (bit 1): 2-byte header CRC16 (skip)
437            if flg & 0x02 != 0 {
438                let mut hcrc = [0u8; 2];
439                bit_reader.read_bytes(&mut hcrc).map_err(|e| {
440                    io::Error::new(
441                        io::ErrorKind::InvalidData,
442                        format!("gzip FHCRC truncated: {e}"),
443                    )
444                })?;
445            }
446
447            // ── 4. Inflate the DEFLATE payload ────────────────────────────────
448            // `inflate_consumed` reads bits from the shared BitReader and returns
449            // the decompressed data.  On return the BitReader is aligned to the
450            // next byte boundary (any intra-byte padding is skipped), so reading
451            // the 8-byte footer next is safe and exact.
452            let mut inflater = Inflater::new();
453            let (decompressed, _consumed) =
454                inflater.inflate_consumed(&mut bit_reader).map_err(|e| {
455                    io::Error::new(
456                        io::ErrorKind::InvalidData,
457                        format!("gzip deflate error: {e}"),
458                    )
459                })?;
460
461            // ── 5. Read and verify the 8-byte GZIP trailer ───────────────────
462            let mut trailer = [0u8; 8];
463            bit_reader.read_bytes(&mut trailer).map_err(|e| {
464                io::Error::new(
465                    io::ErrorKind::InvalidData,
466                    format!("gzip trailer truncated: {e}"),
467                )
468            })?;
469
470            let stored_crc = u32::from_le_bytes([trailer[0], trailer[1], trailer[2], trailer[3]]);
471            let stored_isize = u32::from_le_bytes([trailer[4], trailer[5], trailer[6], trailer[7]]);
472
473            let actual_crc = Crc32::compute(&decompressed);
474            if actual_crc != stored_crc {
475                return Err(io::Error::new(
476                    io::ErrorKind::InvalidData,
477                    format!(
478                        "gzip CRC-32 mismatch: stored {stored_crc:#010x}, computed {actual_crc:#010x}"
479                    ),
480                ));
481            }
482
483            let actual_isize = (decompressed.len() as u64 & 0xFFFF_FFFF) as u32;
484            if actual_isize != stored_isize {
485                return Err(io::Error::new(
486                    io::ErrorKind::InvalidData,
487                    format!("gzip ISIZE mismatch: stored {stored_isize}, computed {actual_isize}"),
488                ));
489            }
490
491            all_decompressed.extend_from_slice(&decompressed);
492        }
493
494        self.output_buffer = all_decompressed;
495        self.output_pos = 0;
496        self.finished = true;
497
498        Ok(())
499    }
500
501    /// Returns the total number of decompressed bytes available.
502    pub fn decompressed_size(&self) -> usize {
503        self.output_buffer.len()
504    }
505
506    /// Returns `true` if all decompressed data has been read.
507    pub fn is_finished(&self) -> bool {
508        self.finished && self.output_pos >= self.output_buffer.len()
509    }
510}
511
512impl<R: Read> Read for GzipStreamDecoder<R> {
513    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
514        self.fill_buffer()?;
515
516        let available = self.output_buffer.len() - self.output_pos;
517        if available == 0 {
518            return Ok(0);
519        }
520
521        let to_copy = buf.len().min(available);
522        buf[..to_copy]
523            .copy_from_slice(&self.output_buffer[self.output_pos..self.output_pos + to_copy]);
524        self.output_pos += to_copy;
525        Ok(to_copy)
526    }
527}
528
529// ---------------------------------------------------------------------------
530// ZlibStreamEncoder
531// ---------------------------------------------------------------------------
532
533/// Streaming Zlib encoder that implements [`Write`].
534///
535/// Maintains a single Zlib stream with a persistent [`Deflater`] and
536/// running Adler-32. Flush methods (`sync_flush`, `full_flush`,
537/// `partial_flush`) emit DEFLATE blocks within the same stream.
538///
539/// **Important:** you *must* call [`finish`](ZlibStreamEncoder::finish) to
540/// write the Adler-32 trailer. Dropping the encoder without calling `finish`
541/// will produce an incomplete Zlib stream.
542pub struct ZlibStreamEncoder<W: Write> {
543    /// The wrapped writer that receives compressed output.
544    inner: Option<W>,
545    /// Internal buffer holding uncompressed data waiting to be flushed.
546    buffer: Vec<u8>,
547    /// Persistent DEFLATE compressor.
548    deflater: Deflater,
549    /// Running Adler-32 over all uncompressed data.
550    adler: Adler32,
551    /// Whether the Zlib header has been written yet.
552    header_written: bool,
553    /// Whether `finish` has already been called.
554    finished: bool,
555    /// Threshold at which the buffer is automatically flushed.
556    block_size: usize,
557    /// Compression level (for header).
558    level: u8,
559}
560
561/// Zlib compression level indicator in header.
562#[derive(Debug, Clone, Copy)]
563#[repr(u8)]
564enum ZlibLevel {
565    /// Fastest compression.
566    Fastest = 0,
567    /// Fast compression.
568    Fast = 1,
569    /// Default compression.
570    Default = 2,
571    /// Maximum compression.
572    Maximum = 3,
573}
574
575impl ZlibLevel {
576    /// Convert from compression level (0-9) to zlib level indicator.
577    fn from_level(level: u8) -> Self {
578        match level {
579            0..=2 => Self::Fastest,
580            3..=5 => Self::Fast,
581            6 => Self::Default,
582            7..=9 => Self::Maximum,
583            _ => Self::Default,
584        }
585    }
586}
587
588impl<W: Write> ZlibStreamEncoder<W> {
589    /// Create a new streaming Zlib encoder wrapping `writer`.
590    ///
591    /// The `level` parameter controls the DEFLATE compression level (0-9).
592    pub fn new(writer: W, level: u8) -> Self {
593        let level = level.min(9);
594        Self {
595            inner: Some(writer),
596            buffer: Vec::new(),
597            deflater: Deflater::new(level),
598            adler: Adler32::new(),
599            header_written: false,
600            finished: false,
601            block_size: DEFAULT_BLOCK_SIZE,
602            level,
603        }
604    }
605
606    /// Set the block size used for incremental flushing.
607    ///
608    /// When the internal buffer reaches this many bytes it is automatically
609    /// flushed via sync_flush.
610    pub fn with_block_size(mut self, block_size: usize) -> Self {
611        self.block_size = block_size.max(1);
612        self
613    }
614
615    /// Write the 2-byte Zlib header if not already written.
616    fn ensure_header(&mut self) -> io::Result<()> {
617        if self.header_written {
618            return Ok(());
619        }
620        // CMF byte: CM=8 (DEFLATE), CINFO=7 (32KB window)
621        let cmf: u8 = 0x78;
622        let flevel = ZlibLevel::from_level(self.level) as u8;
623        let fdict = 0u8;
624        let fcheck = {
625            let base = (cmf as u16) * 256 + ((flevel << 6) | (fdict << 5)) as u16;
626            let remainder = base % 31;
627            if remainder == 0 {
628                0
629            } else {
630                (31 - remainder) as u8
631            }
632        };
633        let flg = (flevel << 6) | (fdict << 5) | fcheck;
634        if let Some(ref mut w) = self.inner {
635            w.write_all(&[cmf, flg])?;
636        }
637        self.header_written = true;
638        Ok(())
639    }
640
641    /// Drain the internal buffer through the deflater using a sync flush.
642    pub fn sync_flush(&mut self) -> io::Result<()> {
643        self.ensure_header()?;
644        let data = std::mem::take(&mut self.buffer);
645        self.adler.update(&data);
646        let mut compressed = Vec::new();
647        self.deflater
648            .deflate_sync(&data, &mut compressed)
649            .map_err(|e| io::Error::other(e.to_string()))?;
650        if let Some(ref mut w) = self.inner {
651            w.write_all(&compressed)?;
652        }
653        Ok(())
654    }
655
656    /// Full flush: same as sync flush, then reset the LZ77 state.
657    pub fn full_flush(&mut self) -> io::Result<()> {
658        self.sync_flush()?;
659        self.deflater.reset_lz77();
660        Ok(())
661    }
662
663    /// Partial flush: emit a compressed block without the sync marker.
664    pub fn partial_flush(&mut self) -> io::Result<()> {
665        self.ensure_header()?;
666        let data = std::mem::take(&mut self.buffer);
667        self.adler.update(&data);
668        let mut compressed = Vec::new();
669        self.deflater
670            .deflate_partial(&data, &mut compressed)
671            .map_err(|e| io::Error::other(e.to_string()))?;
672        if let Some(ref mut w) = self.inner {
673            w.write_all(&compressed)?;
674        }
675        Ok(())
676    }
677
678    /// Finish compression and return the inner writer.
679    ///
680    /// This **must** be called to write the Adler-32 trailer.
681    pub fn finish(mut self) -> io::Result<W> {
682        if !self.finished {
683            self.ensure_header()?;
684            let data = std::mem::take(&mut self.buffer);
685            self.adler.update(&data);
686            let mut compressed = Vec::new();
687            self.deflater
688                .deflate(&data, &mut compressed, true)
689                .map_err(|e| io::Error::other(e.to_string()))?;
690            if let Some(ref mut w) = self.inner {
691                w.write_all(&compressed)?;
692            }
693            // Write Adler-32 checksum (big-endian).
694            let checksum = self.adler.finish();
695            if let Some(ref mut w) = self.inner {
696                w.write_all(&checksum.to_be_bytes())?;
697            }
698            self.finished = true;
699        }
700        self.inner
701            .take()
702            .ok_or_else(|| io::Error::other("inner writer already taken"))
703    }
704
705    /// If the buffer has reached `block_size`, sync-flush it.
706    fn maybe_flush_block(&mut self) -> io::Result<()> {
707        if self.buffer.len() >= self.block_size {
708            self.sync_flush()?;
709        }
710        Ok(())
711    }
712
713    /// Returns the number of uncompressed bytes currently buffered.
714    pub fn buffered_bytes(&self) -> usize {
715        self.buffer.len()
716    }
717
718    /// Returns `true` if `finish` has already been called.
719    pub fn is_finished(&self) -> bool {
720        self.finished
721    }
722}
723
724impl<W: Write> Write for ZlibStreamEncoder<W> {
725    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
726        if self.finished {
727            return Err(io::Error::other("encoder already finished"));
728        }
729        self.buffer.extend_from_slice(buf);
730        self.maybe_flush_block()?;
731        Ok(buf.len())
732    }
733
734    fn flush(&mut self) -> io::Result<()> {
735        if !self.buffer.is_empty() {
736            self.sync_flush()?;
737        }
738        if let Some(ref mut w) = self.inner {
739            w.flush()?;
740        }
741        Ok(())
742    }
743}
744
745// ---------------------------------------------------------------------------
746// ZlibStreamDecoder
747// ---------------------------------------------------------------------------
748
749/// Streaming Zlib decoder that implements [`Read`].
750///
751/// All compressed data is read eagerly from the inner reader on the first
752/// `read` call, decompressed into an internal buffer, and then served from
753/// that buffer for subsequent reads.
754///
755/// Supports concatenated Zlib streams: each stream is decompressed
756/// independently and the results are concatenated.
757pub struct ZlibStreamDecoder<R: Read> {
758    /// The wrapped reader providing compressed input.
759    inner: R,
760    /// Decompressed output buffer.
761    output_buffer: Vec<u8>,
762    /// Current read position inside `output_buffer`.
763    output_pos: usize,
764    /// Whether the compressed stream has been fully consumed.
765    finished: bool,
766}
767
768impl<R: Read> ZlibStreamDecoder<R> {
769    /// Create a new streaming Zlib decoder wrapping `reader`.
770    pub fn new(reader: R) -> Self {
771        Self {
772            inner: reader,
773            output_buffer: Vec::new(),
774            output_pos: 0,
775            finished: false,
776        }
777    }
778
779    /// Consume the decoder and return the inner reader.
780    pub fn into_inner(self) -> R {
781        self.inner
782    }
783
784    /// Read and decompress all compressed data from the inner reader.
785    ///
786    /// Handles concatenated Zlib streams by repeatedly decoding until all
787    /// input is consumed.
788    fn fill_buffer(&mut self) -> io::Result<()> {
789        if self.finished || self.output_pos < self.output_buffer.len() {
790            return Ok(());
791        }
792
793        let mut compressed = Vec::new();
794        self.inner.read_to_end(&mut compressed)?;
795
796        if compressed.is_empty() {
797            self.finished = true;
798            return Ok(());
799        }
800
801        // Decompress concatenated Zlib streams. A Zlib stream starts with a
802        // CMF byte where CM=8 (lower nibble). The typical CMF value is 0x78
803        // (window size = 32KB, deflate).
804        let mut all_decompressed = Vec::new();
805        let mut remaining = &compressed[..];
806
807        while !remaining.is_empty() {
808            // Validate minimum size (2-byte header + at least some data + 4-byte checksum)
809            if remaining.len() < 6 {
810                break;
811            }
812
813            // Check if this looks like a valid zlib header
814            let cmf = remaining[0];
815            let cm = cmf & 0x0F;
816            if cm != 8 {
817                break;
818            }
819            let flg = remaining[1];
820            let check = (cmf as u16) * 256 + (flg as u16);
821            if check % 31 != 0 {
822                break;
823            }
824
825            match zlib_decompress(remaining) {
826                Ok(decompressed) => {
827                    all_decompressed.extend_from_slice(&decompressed);
828                    // Successfully decoded. Since zlib_decompress consumes the
829                    // entire input, we are done.
830                    remaining = &[];
831                }
832                Err(_) => {
833                    // There might be concatenated streams. Try to find the
834                    // boundary by looking for the next valid zlib header.
835                    let mut decoded_one = false;
836                    // A minimal zlib stream is 6 bytes (2 header + empty deflate + 4 adler32).
837                    for split_pos in 6..remaining.len().saturating_sub(5) {
838                        let candidate_cmf = remaining[split_pos];
839                        let candidate_cm = candidate_cmf & 0x0F;
840                        if candidate_cm != 8 {
841                            continue;
842                        }
843                        if split_pos + 1 >= remaining.len() {
844                            continue;
845                        }
846                        let candidate_flg = remaining[split_pos + 1];
847                        let candidate_check = (candidate_cmf as u16) * 256 + (candidate_flg as u16);
848                        if candidate_check % 31 != 0 {
849                            continue;
850                        }
851                        // Looks like a valid header; try to decode the first part
852                        if let Ok(decompressed) = zlib_decompress(&remaining[..split_pos]) {
853                            all_decompressed.extend_from_slice(&decompressed);
854                            remaining = &remaining[split_pos..];
855                            decoded_one = true;
856                            break;
857                        }
858                    }
859                    if !decoded_one {
860                        return Err(io::Error::new(
861                            io::ErrorKind::InvalidData,
862                            "failed to decompress Zlib data",
863                        ));
864                    }
865                }
866            }
867        }
868
869        self.output_buffer = all_decompressed;
870        self.output_pos = 0;
871        self.finished = true;
872
873        Ok(())
874    }
875
876    /// Returns the total number of decompressed bytes available.
877    pub fn decompressed_size(&self) -> usize {
878        self.output_buffer.len()
879    }
880
881    /// Returns `true` if all decompressed data has been read.
882    pub fn is_finished(&self) -> bool {
883        self.finished && self.output_pos >= self.output_buffer.len()
884    }
885}
886
887impl<R: Read> Read for ZlibStreamDecoder<R> {
888    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
889        self.fill_buffer()?;
890
891        let available = self.output_buffer.len() - self.output_pos;
892        if available == 0 {
893            return Ok(0);
894        }
895
896        let to_copy = buf.len().min(available);
897        buf[..to_copy]
898            .copy_from_slice(&self.output_buffer[self.output_pos..self.output_pos + to_copy]);
899        self.output_pos += to_copy;
900        Ok(to_copy)
901    }
902}
903
904#[cfg(test)]
905mod tests {
906    use super::*;
907
908    // -----------------------------------------------------------------------
909    // GzipStreamEncoder tests
910    // -----------------------------------------------------------------------
911
912    #[test]
913    fn test_gzip_stream_encoder_basic() {
914        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
915        encoder.write_all(b"Hello, GZIP!").expect("write failed");
916        let compressed = encoder.finish().expect("finish failed");
917        assert!(!compressed.is_empty());
918        // Should start with GZIP magic bytes
919        assert_eq!(compressed[0], 0x1f);
920        assert_eq!(compressed[1], 0x8b);
921    }
922
923    #[test]
924    fn test_gzip_stream_encoder_empty() {
925        let encoder = GzipStreamEncoder::new(Vec::new(), 6);
926        let compressed = encoder.finish().expect("finish failed");
927        // Should produce a valid (minimal) GZIP member.
928        assert!(!compressed.is_empty());
929        assert_eq!(compressed[0], 0x1f);
930        assert_eq!(compressed[1], 0x8b);
931    }
932
933    #[test]
934    fn test_gzip_stream_roundtrip() {
935        let original = b"The quick brown fox jumps over the lazy dog.";
936
937        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
938        encoder.write_all(original).expect("write failed");
939        let compressed = encoder.finish().expect("finish failed");
940
941        let mut decoder = GzipStreamDecoder::new(&compressed[..]);
942        let mut output = Vec::new();
943        decoder.read_to_end(&mut output).expect("read failed");
944
945        assert_eq!(output, original.as_slice());
946    }
947
948    #[test]
949    fn test_gzip_stream_roundtrip_multiple_writes() {
950        let parts: &[&[u8]] = &[b"Hello, ", b"streaming ", b"GZIP!"];
951
952        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
953        for part in parts {
954            encoder.write_all(part).expect("write failed");
955        }
956        let compressed = encoder.finish().expect("finish failed");
957
958        let mut decoder = GzipStreamDecoder::new(&compressed[..]);
959        let mut output = Vec::new();
960        decoder.read_to_end(&mut output).expect("read failed");
961
962        assert_eq!(output, b"Hello, streaming GZIP!");
963    }
964
965    #[test]
966    fn test_gzip_stream_decoder_small_reads() {
967        let original = b"ABCDEFGHIJ";
968
969        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
970        encoder.write_all(original).expect("write failed");
971        let compressed = encoder.finish().expect("finish failed");
972
973        let mut decoder = GzipStreamDecoder::new(&compressed[..]);
974        let mut output = Vec::new();
975        let mut buf = [0u8; 3];
976
977        loop {
978            let n = decoder.read(&mut buf).expect("read failed");
979            if n == 0 {
980                break;
981            }
982            output.extend_from_slice(&buf[..n]);
983        }
984
985        assert_eq!(output, original.as_slice());
986    }
987
988    #[test]
989    fn test_gzip_stream_decoder_empty_input() {
990        let mut decoder = GzipStreamDecoder::new(&[][..]);
991        let mut buf = [0u8; 16];
992        let n = decoder.read(&mut buf).expect("read failed");
993        assert_eq!(n, 0);
994    }
995
996    #[test]
997    fn test_gzip_stream_encoder_buffered_bytes() {
998        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
999        assert_eq!(encoder.buffered_bytes(), 0);
1000        encoder.write_all(b"12345").expect("write failed");
1001        assert_eq!(encoder.buffered_bytes(), 5);
1002        encoder.write_all(b"67890").expect("write failed");
1003        assert_eq!(encoder.buffered_bytes(), 10);
1004    }
1005
1006    #[test]
1007    fn test_gzip_stream_encoder_is_finished() {
1008        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
1009        assert!(!encoder.is_finished());
1010        encoder.write_all(b"data").expect("write failed");
1011        assert!(!encoder.is_finished());
1012    }
1013
1014    #[test]
1015    fn test_gzip_stream_decoder_is_finished() {
1016        let original = b"short";
1017
1018        let mut enc = GzipStreamEncoder::new(Vec::new(), 6);
1019        enc.write_all(original).expect("write failed");
1020        let compressed = enc.finish().expect("finish failed");
1021
1022        let mut decoder = GzipStreamDecoder::new(&compressed[..]);
1023        assert!(!decoder.is_finished());
1024
1025        let mut out = Vec::new();
1026        decoder.read_to_end(&mut out).expect("read failed");
1027        assert!(decoder.is_finished());
1028    }
1029
1030    #[test]
1031    fn test_gzip_stream_roundtrip_large_data() {
1032        let original: Vec<u8> = (0..10_000).map(|i| (i % 256) as u8).collect();
1033
1034        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
1035        encoder.write_all(&original).expect("write failed");
1036        let compressed = encoder.finish().expect("finish failed");
1037
1038        let mut decoder = GzipStreamDecoder::new(&compressed[..]);
1039        let mut output = Vec::new();
1040        decoder.read_to_end(&mut output).expect("read failed");
1041
1042        assert_eq!(output, original);
1043    }
1044
1045    #[test]
1046    fn test_gzip_stream_all_levels() {
1047        let original = b"AAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCC";
1048        for level in 0u8..=9 {
1049            let mut encoder = GzipStreamEncoder::new(Vec::new(), level);
1050            encoder.write_all(original).expect("write failed");
1051            let compressed = encoder.finish().expect("finish failed");
1052
1053            let mut decoder = GzipStreamDecoder::new(&compressed[..]);
1054            let mut output = Vec::new();
1055            decoder.read_to_end(&mut output).expect("read failed");
1056
1057            assert_eq!(
1058                output,
1059                original.as_slice(),
1060                "roundtrip failed at level {}",
1061                level,
1062            );
1063        }
1064    }
1065
1066    #[test]
1067    fn test_gzip_stream_decoder_into_inner() {
1068        let data = vec![1u8, 2, 3, 4, 5];
1069        let decoder = GzipStreamDecoder::new(data.as_slice());
1070        let inner = decoder.into_inner();
1071        assert_eq!(inner, data.as_slice());
1072    }
1073
1074    #[test]
1075    fn test_gzip_stream_flush() {
1076        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
1077        encoder
1078            .write_all(b"data before flush")
1079            .expect("write failed");
1080        encoder.flush().expect("flush failed");
1081        assert_eq!(encoder.buffered_bytes(), 0);
1082        encoder.write_all(b" more data").expect("write failed");
1083        let compressed = encoder.finish().expect("finish failed");
1084
1085        // The output is a single GZIP stream (not concatenated members).
1086        let mut decoder = GzipStreamDecoder::new(&compressed[..]);
1087        let mut output = Vec::new();
1088        decoder.read_to_end(&mut output).expect("read failed");
1089
1090        assert_eq!(output, b"data before flush more data");
1091    }
1092
1093    #[test]
1094    fn test_gzip_stream_with_block_size() {
1095        // Use a very small block size to force multiple flushes.
1096        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6).with_block_size(10);
1097        encoder
1098            .write_all(b"This is more than ten bytes of data")
1099            .expect("write failed");
1100        // After writing 35 bytes with block_size=10, there should have been
1101        // at least one automatic flush. The buffer should hold the remainder.
1102        assert!(encoder.buffered_bytes() < 35);
1103        let compressed = encoder.finish().expect("finish failed");
1104
1105        let mut decoder = GzipStreamDecoder::new(&compressed[..]);
1106        let mut output = Vec::new();
1107        decoder.read_to_end(&mut output).expect("read failed");
1108
1109        assert_eq!(output, b"This is more than ten bytes of data");
1110    }
1111
1112    // -----------------------------------------------------------------------
1113    // ZlibStreamEncoder tests
1114    // -----------------------------------------------------------------------
1115
1116    #[test]
1117    fn test_zlib_stream_encoder_basic() {
1118        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1119        encoder.write_all(b"Hello, Zlib!").expect("write failed");
1120        let compressed = encoder.finish().expect("finish failed");
1121        assert!(!compressed.is_empty());
1122        // Should start with zlib CMF byte 0x78
1123        assert_eq!(compressed[0], 0x78);
1124    }
1125
1126    #[test]
1127    fn test_zlib_stream_encoder_empty() {
1128        let encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1129        let compressed = encoder.finish().expect("finish failed");
1130        assert!(!compressed.is_empty());
1131        assert_eq!(compressed[0], 0x78);
1132    }
1133
1134    #[test]
1135    fn test_zlib_stream_roundtrip() {
1136        let original = b"The quick brown fox jumps over the lazy dog.";
1137
1138        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1139        encoder.write_all(original).expect("write failed");
1140        let compressed = encoder.finish().expect("finish failed");
1141
1142        let mut decoder = ZlibStreamDecoder::new(&compressed[..]);
1143        let mut output = Vec::new();
1144        decoder.read_to_end(&mut output).expect("read failed");
1145
1146        assert_eq!(output, original.as_slice());
1147    }
1148
1149    #[test]
1150    fn test_zlib_stream_roundtrip_multiple_writes() {
1151        let parts: &[&[u8]] = &[b"Hello, ", b"streaming ", b"Zlib!"];
1152
1153        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1154        for part in parts {
1155            encoder.write_all(part).expect("write failed");
1156        }
1157        let compressed = encoder.finish().expect("finish failed");
1158
1159        let mut decoder = ZlibStreamDecoder::new(&compressed[..]);
1160        let mut output = Vec::new();
1161        decoder.read_to_end(&mut output).expect("read failed");
1162
1163        assert_eq!(output, b"Hello, streaming Zlib!");
1164    }
1165
1166    #[test]
1167    fn test_zlib_stream_decoder_small_reads() {
1168        let original = b"ABCDEFGHIJ";
1169
1170        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1171        encoder.write_all(original).expect("write failed");
1172        let compressed = encoder.finish().expect("finish failed");
1173
1174        let mut decoder = ZlibStreamDecoder::new(&compressed[..]);
1175        let mut output = Vec::new();
1176        let mut buf = [0u8; 3];
1177
1178        loop {
1179            let n = decoder.read(&mut buf).expect("read failed");
1180            if n == 0 {
1181                break;
1182            }
1183            output.extend_from_slice(&buf[..n]);
1184        }
1185
1186        assert_eq!(output, original.as_slice());
1187    }
1188
1189    #[test]
1190    fn test_zlib_stream_decoder_empty_input() {
1191        let mut decoder = ZlibStreamDecoder::new(&[][..]);
1192        let mut buf = [0u8; 16];
1193        let n = decoder.read(&mut buf).expect("read failed");
1194        assert_eq!(n, 0);
1195    }
1196
1197    #[test]
1198    fn test_zlib_stream_encoder_buffered_bytes() {
1199        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1200        assert_eq!(encoder.buffered_bytes(), 0);
1201        encoder.write_all(b"12345").expect("write failed");
1202        assert_eq!(encoder.buffered_bytes(), 5);
1203    }
1204
1205    #[test]
1206    fn test_zlib_stream_decoder_is_finished() {
1207        let original = b"short";
1208
1209        let mut enc = ZlibStreamEncoder::new(Vec::new(), 6);
1210        enc.write_all(original).expect("write failed");
1211        let compressed = enc.finish().expect("finish failed");
1212
1213        let mut decoder = ZlibStreamDecoder::new(&compressed[..]);
1214        assert!(!decoder.is_finished());
1215
1216        let mut out = Vec::new();
1217        decoder.read_to_end(&mut out).expect("read failed");
1218        assert!(decoder.is_finished());
1219    }
1220
1221    #[test]
1222    fn test_zlib_stream_roundtrip_large_data() {
1223        let original: Vec<u8> = (0..10_000).map(|i| (i % 256) as u8).collect();
1224
1225        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1226        encoder.write_all(&original).expect("write failed");
1227        let compressed = encoder.finish().expect("finish failed");
1228
1229        let mut decoder = ZlibStreamDecoder::new(&compressed[..]);
1230        let mut output = Vec::new();
1231        decoder.read_to_end(&mut output).expect("read failed");
1232
1233        assert_eq!(output, original);
1234    }
1235
1236    #[test]
1237    fn test_zlib_stream_all_levels() {
1238        let original = b"AAAAAAAAAAAAAAAAAABBBBBBBBBBBBBBBBCCCCCCCCCCCCCCCC";
1239        for level in 0u8..=9 {
1240            let mut encoder = ZlibStreamEncoder::new(Vec::new(), level);
1241            encoder.write_all(original).expect("write failed");
1242            let compressed = encoder.finish().expect("finish failed");
1243
1244            let mut decoder = ZlibStreamDecoder::new(&compressed[..]);
1245            let mut output = Vec::new();
1246            decoder.read_to_end(&mut output).expect("read failed");
1247
1248            assert_eq!(
1249                output,
1250                original.as_slice(),
1251                "roundtrip failed at level {}",
1252                level,
1253            );
1254        }
1255    }
1256
1257    #[test]
1258    fn test_zlib_stream_decoder_into_inner() {
1259        let data = vec![1u8, 2, 3, 4, 5];
1260        let decoder = ZlibStreamDecoder::new(data.as_slice());
1261        let inner = decoder.into_inner();
1262        assert_eq!(inner, data.as_slice());
1263    }
1264
1265    #[test]
1266    fn test_zlib_stream_flush() {
1267        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1268        encoder
1269            .write_all(b"data before flush")
1270            .expect("write failed");
1271        encoder.flush().expect("flush failed");
1272        assert_eq!(encoder.buffered_bytes(), 0);
1273        encoder.write_all(b" more data").expect("write failed");
1274        let compressed = encoder.finish().expect("finish failed");
1275
1276        // Single zlib stream, not concatenated.
1277        let mut decoder = ZlibStreamDecoder::new(&compressed[..]);
1278        let mut output = Vec::new();
1279        decoder.read_to_end(&mut output).expect("read failed");
1280
1281        assert_eq!(output, b"data before flush more data");
1282    }
1283
1284    #[test]
1285    fn test_zlib_stream_with_block_size() {
1286        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6).with_block_size(10);
1287        encoder
1288            .write_all(b"This is more than ten bytes of data")
1289            .expect("write failed");
1290        assert!(encoder.buffered_bytes() < 35);
1291        let compressed = encoder.finish().expect("finish failed");
1292
1293        let mut decoder = ZlibStreamDecoder::new(&compressed[..]);
1294        let mut output = Vec::new();
1295        decoder.read_to_end(&mut output).expect("read failed");
1296
1297        assert_eq!(output, b"This is more than ten bytes of data");
1298    }
1299
1300    // -----------------------------------------------------------------------
1301    // Cross-format sanity
1302    // -----------------------------------------------------------------------
1303
1304    #[test]
1305    fn test_gzip_and_zlib_produce_different_output() {
1306        let original = b"Same input for both formats";
1307
1308        let mut gzip_enc = GzipStreamEncoder::new(Vec::new(), 6);
1309        gzip_enc.write_all(original).expect("write failed");
1310        let gzip_out = gzip_enc.finish().expect("finish failed");
1311
1312        let mut zlib_enc = ZlibStreamEncoder::new(Vec::new(), 6);
1313        zlib_enc.write_all(original).expect("write failed");
1314        let zlib_out = zlib_enc.finish().expect("finish failed");
1315
1316        // Different framing means different output.
1317        assert_ne!(gzip_out, zlib_out);
1318
1319        // But both decompress to the same thing.
1320        let mut gzip_dec = GzipStreamDecoder::new(&gzip_out[..]);
1321        let mut gzip_result = Vec::new();
1322        gzip_dec.read_to_end(&mut gzip_result).expect("read failed");
1323
1324        let mut zlib_dec = ZlibStreamDecoder::new(&zlib_out[..]);
1325        let mut zlib_result = Vec::new();
1326        zlib_dec.read_to_end(&mut zlib_result).expect("read failed");
1327
1328        assert_eq!(gzip_result, original.as_slice());
1329        assert_eq!(zlib_result, original.as_slice());
1330    }
1331
1332    #[test]
1333    fn test_stream_encoder_write_after_finish_errors() {
1334        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
1335        encoder.write_all(b"first").expect("write failed");
1336        encoder.flush().expect("flush failed");
1337        // Can still write after flush (flush != finish)
1338        encoder.write_all(b"second").expect("write failed");
1339        let _compressed = encoder.finish().expect("finish failed");
1340    }
1341
1342    // -----------------------------------------------------------------------
1343    // Flush mode tests
1344    // -----------------------------------------------------------------------
1345
1346    #[test]
1347    fn test_gzip_sync_flush_produces_decompressible_prefix() {
1348        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
1349        encoder
1350            .write_all(b"Hello, sync flush world!")
1351            .expect("write failed");
1352        encoder.sync_flush().expect("sync_flush failed");
1353        // After sync flush the buffer should be empty.
1354        assert_eq!(encoder.buffered_bytes(), 0);
1355        // Write more data and finish.
1356        encoder
1357            .write_all(b" And more data after sync.")
1358            .expect("write failed");
1359        let compressed = encoder.finish().expect("finish failed");
1360
1361        let mut decoder = GzipStreamDecoder::new(&compressed[..]);
1362        let mut output = Vec::new();
1363        decoder.read_to_end(&mut output).expect("read failed");
1364        assert_eq!(
1365            output,
1366            b"Hello, sync flush world! And more data after sync."
1367        );
1368    }
1369
1370    #[test]
1371    fn test_gzip_full_flush_resets_state() {
1372        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
1373        encoder
1374            .write_all(b"Data before full flush.")
1375            .expect("write failed");
1376        encoder.full_flush().expect("full_flush failed");
1377        assert_eq!(encoder.buffered_bytes(), 0);
1378        encoder
1379            .write_all(b" Data after full flush.")
1380            .expect("write failed");
1381        let compressed = encoder.finish().expect("finish failed");
1382
1383        let mut decoder = GzipStreamDecoder::new(&compressed[..]);
1384        let mut output = Vec::new();
1385        decoder.read_to_end(&mut output).expect("read failed");
1386        assert_eq!(output, b"Data before full flush. Data after full flush.");
1387    }
1388
1389    #[test]
1390    fn test_gzip_partial_flush() {
1391        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
1392        encoder
1393            .write_all(b"Partial flush data.")
1394            .expect("write failed");
1395        encoder.partial_flush().expect("partial_flush failed");
1396        assert_eq!(encoder.buffered_bytes(), 0);
1397        encoder
1398            .write_all(b" More data after partial.")
1399            .expect("write failed");
1400        let compressed = encoder.finish().expect("finish failed");
1401
1402        let mut decoder = GzipStreamDecoder::new(&compressed[..]);
1403        let mut output = Vec::new();
1404        decoder.read_to_end(&mut output).expect("read failed");
1405        assert_eq!(output, b"Partial flush data. More data after partial.");
1406    }
1407
1408    #[test]
1409    fn test_zlib_sync_flush_produces_decompressible_prefix() {
1410        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1411        encoder
1412            .write_all(b"Hello, zlib sync flush!")
1413            .expect("write failed");
1414        encoder.sync_flush().expect("sync_flush failed");
1415        assert_eq!(encoder.buffered_bytes(), 0);
1416        encoder
1417            .write_all(b" More zlib data.")
1418            .expect("write failed");
1419        let compressed = encoder.finish().expect("finish failed");
1420
1421        let mut decoder = ZlibStreamDecoder::new(&compressed[..]);
1422        let mut output = Vec::new();
1423        decoder.read_to_end(&mut output).expect("read failed");
1424        assert_eq!(output, b"Hello, zlib sync flush! More zlib data.");
1425    }
1426
1427    #[test]
1428    fn test_zlib_full_flush_resets_state() {
1429        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1430        encoder
1431            .write_all(b"Zlib before full flush.")
1432            .expect("write failed");
1433        encoder.full_flush().expect("full_flush failed");
1434        encoder
1435            .write_all(b" Zlib after full flush.")
1436            .expect("write failed");
1437        let compressed = encoder.finish().expect("finish failed");
1438
1439        let mut decoder = ZlibStreamDecoder::new(&compressed[..]);
1440        let mut output = Vec::new();
1441        decoder.read_to_end(&mut output).expect("read failed");
1442        assert_eq!(output, b"Zlib before full flush. Zlib after full flush.");
1443    }
1444
1445    #[test]
1446    fn test_zlib_partial_flush() {
1447        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1448        encoder
1449            .write_all(b"Zlib partial flush.")
1450            .expect("write failed");
1451        encoder.partial_flush().expect("partial_flush failed");
1452        encoder
1453            .write_all(b" More zlib data after partial.")
1454            .expect("write failed");
1455        let compressed = encoder.finish().expect("finish failed");
1456
1457        let mut decoder = ZlibStreamDecoder::new(&compressed[..]);
1458        let mut output = Vec::new();
1459        decoder.read_to_end(&mut output).expect("read failed");
1460        assert_eq!(output, b"Zlib partial flush. More zlib data after partial.");
1461    }
1462
1463    #[test]
1464    fn test_gzip_multiple_flush_write_cycles() {
1465        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
1466        let mut expected = Vec::new();
1467
1468        for i in 0..5 {
1469            let chunk = format!("Chunk {} data. ", i);
1470            expected.extend_from_slice(chunk.as_bytes());
1471            encoder.write_all(chunk.as_bytes()).expect("write failed");
1472
1473            // Alternate between flush types.
1474            match i % 3 {
1475                0 => encoder.sync_flush().expect("sync_flush failed"),
1476                1 => encoder.full_flush().expect("full_flush failed"),
1477                _ => encoder.partial_flush().expect("partial_flush failed"),
1478            }
1479        }
1480
1481        let compressed = encoder.finish().expect("finish failed");
1482
1483        let mut decoder = GzipStreamDecoder::new(&compressed[..]);
1484        let mut output = Vec::new();
1485        decoder.read_to_end(&mut output).expect("read failed");
1486        assert_eq!(output, expected);
1487    }
1488
1489    #[test]
1490    fn test_zlib_multiple_flush_write_cycles() {
1491        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1492        let mut expected = Vec::new();
1493
1494        for i in 0..5 {
1495            let chunk = format!("ZChunk {} data. ", i);
1496            expected.extend_from_slice(chunk.as_bytes());
1497            encoder.write_all(chunk.as_bytes()).expect("write failed");
1498
1499            match i % 3 {
1500                0 => encoder.sync_flush().expect("sync_flush failed"),
1501                1 => encoder.full_flush().expect("full_flush failed"),
1502                _ => encoder.partial_flush().expect("partial_flush failed"),
1503            }
1504        }
1505
1506        let compressed = encoder.finish().expect("finish failed");
1507
1508        let mut decoder = ZlibStreamDecoder::new(&compressed[..]);
1509        let mut output = Vec::new();
1510        decoder.read_to_end(&mut output).expect("read failed");
1511        assert_eq!(output, expected);
1512    }
1513
1514    #[test]
1515    fn test_gzip_sync_flush_marker_present() {
1516        // After a sync flush, the output should contain the sync marker
1517        // bytes 0x00 0x00 0xFF 0xFF somewhere in the DEFLATE payload.
1518        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
1519        encoder
1520            .write_all(b"test sync marker")
1521            .expect("write failed");
1522        encoder.sync_flush().expect("sync_flush failed");
1523        // Finish without more data to get the full compressed output.
1524        let compressed = encoder.finish().expect("finish failed");
1525
1526        // Search for the sync marker pattern in the compressed stream.
1527        // Skip the 10-byte GZIP header.
1528        let payload = &compressed[10..];
1529        let marker = [0x00, 0x00, 0xFF, 0xFF];
1530        let found = payload.windows(4).any(|w| w == marker);
1531        assert!(found, "Sync flush marker not found in GZIP payload");
1532    }
1533
1534    #[test]
1535    fn test_zlib_sync_flush_marker_present() {
1536        let mut encoder = ZlibStreamEncoder::new(Vec::new(), 6);
1537        encoder
1538            .write_all(b"test zlib sync marker")
1539            .expect("write failed");
1540        encoder.sync_flush().expect("sync_flush failed");
1541        let compressed = encoder.finish().expect("finish failed");
1542
1543        // Skip the 2-byte zlib header.
1544        let payload = &compressed[2..];
1545        let marker = [0x00, 0x00, 0xFF, 0xFF];
1546        let found = payload.windows(4).any(|w| w == marker);
1547        assert!(found, "Sync flush marker not found in Zlib payload");
1548    }
1549
1550    #[test]
1551    fn test_gzip_partial_flush_no_sync_marker() {
1552        // Partial flush should NOT produce the sync marker.
1553        let mut encoder = GzipStreamEncoder::new(Vec::new(), 6);
1554        encoder
1555            .write_all(b"test partial no marker")
1556            .expect("write failed");
1557        encoder.partial_flush().expect("partial_flush failed");
1558        // Write the finish block to complete the stream.
1559        let compressed = encoder.finish().expect("finish failed");
1560
1561        // Check that the sync marker does NOT appear in the deflate
1562        // section *before* the final block. We check the entire payload
1563        // for the marker pattern (which should not appear from partial flush).
1564        // The final block may contain 0x00 0x00 0xFF 0xFF coincidentally,
1565        // but a partial flush specifically avoids emitting the empty stored block.
1566        let payload = &compressed[10..compressed.len() - 8]; // skip header and trailer
1567        let marker = [0x00, 0x00, 0xFF, 0xFF];
1568        let found = payload.windows(4).any(|w| w == marker);
1569        assert!(!found, "Partial flush should not produce sync marker");
1570    }
1571}