Skip to main content

oxiarc_snappy/
frame.rs

1//! Snappy framed format (streaming) encoder and decoder.
2//!
3//! The Snappy framed format wraps raw Snappy blocks with:
4//! - A stream identifier chunk at the start
5//! - Compressed or uncompressed data chunks with CRC32C checksums
6//! - Maximum uncompressed chunk size of 65536 bytes
7//!
8//! This module provides `FrameEncoder` (compression) and `FrameDecoder`
9//! (decompression) that implement `Write` and `Read` respectively.
10
11use std::io::{self, Read, Write};
12use std::sync::Arc;
13
14use oxiarc_core::cancel::CancellationToken;
15use oxiarc_core::progress::ProgressHandle;
16
17use crate::compress;
18use crate::crc32c::{crc32c, masked_crc32c};
19use crate::decompress;
20use crate::error::SnappyError;
21use crate::pool::{PoolInner, SnappyPool};
22
23/// Stream identifier magic bytes: "sNaPpY" (0xff 0x06 0x00 0x00 0x73 0x4e 0x61 0x50 0x70 0x59)
24const STREAM_IDENTIFIER: [u8; 10] = [0xFF, 0x06, 0x00, 0x00, 0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
25
26/// OxiArc dictionary-frame skippable chunk type.
27/// Skippable chunks are 0x80..=0xFE.  We use 0xFE to identify the dict info.
28const CHUNK_TYPE_OXIARC_DICT: u8 = 0xFE;
29
30/// Magic prefix for the OxiArc dict info chunk body.
31const OXIARC_DICT_MAGIC: &[u8] = b"OXIAD";
32
33/// The "sNaPpY" body of the stream identifier (without the chunk header).
34const STREAM_BODY: [u8; 6] = [0x73, 0x4E, 0x61, 0x50, 0x70, 0x59];
35
36/// Chunk type: compressed data
37const CHUNK_TYPE_COMPRESSED: u8 = 0x00;
38
39/// Chunk type: uncompressed data
40const CHUNK_TYPE_UNCOMPRESSED: u8 = 0x01;
41
42/// Chunk type: stream identifier
43const CHUNK_TYPE_STREAM_ID: u8 = 0xFF;
44
45/// Maximum uncompressed chunk size (64 KiB).
46const MAX_UNCOMPRESSED_CHUNK_SIZE: usize = 65536;
47
48/// Snappy framed format encoder.
49///
50/// Wraps a writer and compresses data written to it using the Snappy
51/// framed format. Data is buffered internally and flushed as complete
52/// chunks.
53///
54/// # Example
55/// ```
56/// use oxiarc_snappy::FrameEncoder;
57/// use std::io::Write;
58///
59/// let mut compressed = Vec::new();
60/// {
61///     let mut encoder = FrameEncoder::new(&mut compressed);
62///     encoder.write_all(b"Hello, World!").unwrap();
63///     encoder.finish().unwrap();
64/// }
65/// ```
66pub struct FrameEncoder<W: Write> {
67    inner: Option<W>,
68    buffer: Vec<u8>,
69    header_written: bool,
70    /// Optional progress sink; receives cumulative bytes processed per chunk.
71    progress: Option<ProgressHandle>,
72    /// Optional cancellation token; checked before each chunk is written.
73    cancel: Option<CancellationToken>,
74    /// Cumulative uncompressed bytes that have been encoded into chunks.
75    bytes_processed: u64,
76    /// Optional shared memory pool for buffer reuse.
77    pool: Option<SnappyPool>,
78}
79
80impl<W: Write> FrameEncoder<W> {
81    /// Create a new framed encoder wrapping the given writer.
82    ///
83    /// The stream identifier chunk will be written on the first `write` call.
84    pub fn new(inner: W) -> Self {
85        Self {
86            inner: Some(inner),
87            buffer: Vec::with_capacity(MAX_UNCOMPRESSED_CHUNK_SIZE),
88            header_written: false,
89            progress: None,
90            cancel: None,
91            bytes_processed: 0,
92            pool: None,
93        }
94    }
95
96    /// Create a new framed encoder that reuses scratch buffers from `pool`.
97    ///
98    /// All other behaviour is identical to [`FrameEncoder::new`].
99    pub fn with_pool(inner: W, pool: &SnappyPool) -> Self {
100        let mut enc = Self::new(inner);
101        enc.pool = Some(pool.clone());
102        enc
103    }
104
105    /// Attach a progress sink that will receive `on_progress` callbacks once
106    /// per encoded chunk.
107    pub fn with_progress(mut self, handle: ProgressHandle) -> Self {
108        self.progress = Some(handle);
109        self
110    }
111
112    /// Attach a cancellation token.  The token is checked before each chunk
113    /// is written; if it has been cancelled the operation returns an I/O
114    /// error with the message `"operation cancelled"`.
115    pub fn with_cancel(mut self, token: CancellationToken) -> Self {
116        self.cancel = Some(token);
117        self
118    }
119
120    /// Finish encoding and return the underlying writer.
121    ///
122    /// This flushes any remaining buffered data as a final chunk.
123    ///
124    /// # Errors
125    /// Returns an I/O error if writing fails.
126    pub fn finish(mut self) -> io::Result<W> {
127        self.flush_buffer()?;
128        self.inner
129            .take()
130            .ok_or_else(|| io::Error::other("encoder already finished"))
131    }
132
133    /// Write the stream identifier if it hasn't been written yet.
134    fn ensure_header(&mut self) -> io::Result<()> {
135        if !self.header_written {
136            if let Some(ref mut w) = self.inner {
137                w.write_all(&STREAM_IDENTIFIER)?;
138            }
139            self.header_written = true;
140        }
141        Ok(())
142    }
143
144    /// Flush the internal buffer as a compressed chunk.
145    fn flush_buffer(&mut self) -> io::Result<()> {
146        if self.buffer.is_empty() {
147            return Ok(());
148        }
149
150        self.ensure_header()?;
151
152        // Check cancellation before committing the chunk.
153        if let Some(ref token) = self.cancel {
154            token
155                .check()
156                .map_err(|_| io::Error::other("operation cancelled"))?;
157        }
158
159        let chunk_len = self.buffer.len() as u64;
160
161        // Swap the full staging buffer out so we can pass a slice to write_chunk
162        // without holding a conflicting borrow on `self`.  When a pool is active,
163        // the replacement buffer is acquired from the pool (preserving capacity);
164        // otherwise we use the standard `mem::take` path.
165        let data = if let Some(ref snappy_pool) = self.pool {
166            let pi = &snappy_pool.inner;
167            let mut replacement = {
168                let mut guard = pi.encoder_scratch.lock().unwrap_or_else(|e| e.into_inner());
169                if let Some(mut b) = guard.pop() {
170                    pi.encoder_scratch_hits
171                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
172                    b.clear();
173                    b
174                } else {
175                    pi.encoder_scratch_allocs
176                        .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
177                    Vec::with_capacity(crate::pool::ENCODER_SCRATCH_CAP)
178                }
179            };
180            // Swap: `replacement` (empty, capacity preserved) becomes new staging;
181            // `self.buffer` (full data) is returned as `data`.
182            std::mem::swap(&mut self.buffer, &mut replacement);
183            replacement
184        } else {
185            std::mem::take(&mut self.buffer)
186        };
187
188        self.write_chunk(&data)?;
189
190        // Return `data` to the pool after writing.
191        if let Some(ref snappy_pool) = self.pool {
192            let pi = &snappy_pool.inner;
193            let mut buf = data;
194            buf.clear();
195            let mut guard = pi.encoder_scratch.lock().unwrap_or_else(|e| e.into_inner());
196            if guard.len() < pi.cap {
197                guard.push(buf);
198            }
199        }
200
201        // Update the running total and notify the progress sink.
202        self.bytes_processed += chunk_len;
203        if let Some(ref handle) = self.progress {
204            handle.on_progress(self.bytes_processed, None);
205        }
206
207        Ok(())
208    }
209
210    /// Write a single chunk of data (must be <= MAX_UNCOMPRESSED_CHUNK_SIZE).
211    fn write_chunk(&mut self, data: &[u8]) -> io::Result<()> {
212        let writer = self
213            .inner
214            .as_mut()
215            .ok_or_else(|| io::Error::other("encoder already finished"))?;
216
217        let checksum = masked_crc32c(data);
218        let compressed = compress::compress(data);
219
220        // Use compressed format only if it actually saves space.
221        // The compressed data in a chunk includes the 4-byte checksum.
222        if compressed.len() < data.len() {
223            // Compressed chunk
224            let chunk_len = 4 + compressed.len(); // 4 bytes checksum + compressed data
225            write_chunk_header(writer, CHUNK_TYPE_COMPRESSED, chunk_len)?;
226            writer.write_all(&checksum.to_le_bytes())?;
227            writer.write_all(&compressed)?;
228        } else {
229            // Uncompressed chunk (compression didn't help)
230            let chunk_len = 4 + data.len(); // 4 bytes checksum + raw data
231            write_chunk_header(writer, CHUNK_TYPE_UNCOMPRESSED, chunk_len)?;
232            writer.write_all(&checksum.to_le_bytes())?;
233            writer.write_all(data)?;
234        }
235
236        Ok(())
237    }
238}
239
240impl<W: Write> Write for FrameEncoder<W> {
241    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
242        if buf.is_empty() {
243            return Ok(0);
244        }
245
246        self.ensure_header()?;
247
248        let mut written = 0;
249
250        while written < buf.len() {
251            let remaining_capacity = MAX_UNCOMPRESSED_CHUNK_SIZE - self.buffer.len();
252            let to_copy = remaining_capacity.min(buf.len() - written);
253
254            self.buffer
255                .extend_from_slice(&buf[written..written + to_copy]);
256            written += to_copy;
257
258            if self.buffer.len() >= MAX_UNCOMPRESSED_CHUNK_SIZE {
259                self.flush_buffer()?;
260            }
261        }
262
263        Ok(written)
264    }
265
266    fn flush(&mut self) -> io::Result<()> {
267        self.flush_buffer()?;
268        if let Some(ref mut w) = self.inner {
269            w.flush()?;
270        }
271        Ok(())
272    }
273}
274
275impl<W: Write> Drop for FrameEncoder<W> {
276    fn drop(&mut self) {
277        // Best-effort flush on drop; errors are silently ignored
278        // since we can't return them from Drop.
279        if !self.buffer.is_empty() && self.inner.is_some() {
280            let _ = self.flush_buffer();
281        }
282    }
283}
284
285/// Snappy framed format decoder.
286///
287/// Wraps a reader and decompresses framed Snappy data read from it.
288///
289/// # Example
290/// ```no_run
291/// use oxiarc_snappy::FrameDecoder;
292/// use std::io::Read;
293///
294/// let compressed_data: Vec<u8> = vec![];
295/// let mut decoder = FrameDecoder::new(&compressed_data[..]);
296/// let mut output = Vec::new();
297/// decoder.read_to_end(&mut output).unwrap();
298/// ```
299pub struct FrameDecoder<R: Read> {
300    inner: R,
301    /// Decoded but not yet consumed output data.
302    output_buffer: Vec<u8>,
303    /// Current read position within output_buffer.
304    output_pos: usize,
305    /// Whether the stream identifier has been validated.
306    header_validated: bool,
307    /// Whether we've reached the end of the stream.
308    at_eof: bool,
309    /// Optional progress sink; receives cumulative bytes processed per chunk.
310    progress: Option<ProgressHandle>,
311    /// Optional cancellation token; checked before each chunk is decoded.
312    cancel: Option<CancellationToken>,
313    /// Cumulative decompressed bytes that have been produced.
314    bytes_processed: u64,
315    /// Optional shared memory pool for scratch buffer reuse.
316    pool: Option<Arc<PoolInner>>,
317}
318
319impl<R: Read> FrameDecoder<R> {
320    /// Create a new framed decoder wrapping the given reader.
321    pub fn new(inner: R) -> Self {
322        Self {
323            inner,
324            output_buffer: Vec::new(),
325            output_pos: 0,
326            header_validated: false,
327            at_eof: false,
328            progress: None,
329            cancel: None,
330            bytes_processed: 0,
331            pool: None,
332        }
333    }
334
335    /// Create a new framed decoder that reuses scratch buffers from `pool`.
336    ///
337    /// All other behaviour is identical to [`FrameDecoder::new`].
338    pub fn with_pool(inner: R, pool: &SnappyPool) -> Self {
339        let mut dec = Self::new(inner);
340        dec.pool = Some(Arc::clone(&pool.inner));
341        dec
342    }
343
344    /// Attach a progress sink that will receive `on_progress` callbacks once
345    /// per decoded chunk.
346    pub fn with_progress(mut self, handle: ProgressHandle) -> Self {
347        self.progress = Some(handle);
348        self
349    }
350
351    /// Attach a cancellation token.  The token is checked before each chunk
352    /// is decoded; if it has been cancelled the operation returns an I/O
353    /// error with the message `"operation cancelled"`.
354    pub fn with_cancel(mut self, token: CancellationToken) -> Self {
355        self.cancel = Some(token);
356        self
357    }
358
359    /// Read and validate the stream identifier chunk.
360    fn validate_header(&mut self) -> io::Result<()> {
361        if self.header_validated {
362            return Ok(());
363        }
364
365        let mut header = [0u8; 10];
366        match self.inner.read_exact(&mut header) {
367            Ok(()) => {}
368            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
369                self.at_eof = true;
370                return Ok(());
371            }
372            Err(e) => return Err(e),
373        }
374
375        if header != STREAM_IDENTIFIER {
376            return Err(io::Error::new(
377                io::ErrorKind::InvalidData,
378                SnappyError::InvalidStreamIdentifier.to_string(),
379            ));
380        }
381
382        self.header_validated = true;
383        Ok(())
384    }
385
386    /// Read the next chunk from the stream and decode it into the output buffer.
387    fn read_next_chunk(&mut self) -> io::Result<bool> {
388        // Check cancellation before beginning each new chunk.
389        if let Some(ref token) = self.cancel {
390            token
391                .check()
392                .map_err(|_| io::Error::other("operation cancelled"))?;
393        }
394
395        // Read chunk header: 1 byte type + 3 bytes length (little-endian)
396        let mut chunk_header = [0u8; 4];
397        match self.inner.read_exact(&mut chunk_header) {
398            Ok(()) => {}
399            Err(e) if e.kind() == io::ErrorKind::UnexpectedEof => {
400                self.at_eof = true;
401                return Ok(false);
402            }
403            Err(e) => return Err(e),
404        }
405
406        let chunk_type = chunk_header[0];
407        let chunk_len = (chunk_header[1] as usize)
408            | ((chunk_header[2] as usize) << 8)
409            | ((chunk_header[3] as usize) << 16);
410
411        match chunk_type {
412            CHUNK_TYPE_COMPRESSED => {
413                self.read_compressed_chunk(chunk_len)?;
414                self.emit_chunk_progress();
415                Ok(true)
416            }
417            CHUNK_TYPE_UNCOMPRESSED => {
418                self.read_uncompressed_chunk(chunk_len)?;
419                self.emit_chunk_progress();
420                Ok(true)
421            }
422            CHUNK_TYPE_STREAM_ID => {
423                // Another stream identifier (valid, just skip/validate)
424                self.read_stream_identifier_chunk(chunk_len)?;
425                Ok(true)
426            }
427            0x02..=0x7F => {
428                // Reserved unskippable chunk -- error
429                Err(io::Error::new(
430                    io::ErrorKind::InvalidData,
431                    SnappyError::InvalidChunkType { chunk_type }.to_string(),
432                ))
433            }
434            _ => {
435                // 0x80..=0xFE: Skippable chunk -- skip the data
436                let mut skip_buf = vec![0u8; chunk_len];
437                self.inner.read_exact(&mut skip_buf)?;
438                Ok(true)
439            }
440        }
441    }
442
443    /// Update `bytes_processed` with the latest chunk size and notify the
444    /// progress sink.  Called after a data chunk has been placed in
445    /// `output_buffer`.
446    fn emit_chunk_progress(&mut self) {
447        let chunk_size = self.output_buffer.len() as u64;
448        self.bytes_processed += chunk_size;
449        if let Some(ref handle) = self.progress {
450            handle.on_progress(self.bytes_processed, None);
451        }
452    }
453
454    /// Acquire a scratch buffer for reading chunk data.
455    ///
456    /// Returns a `Vec<u8>` sized to `chunk_len` bytes (zeroed), either from
457    /// the pool or freshly allocated.
458    fn acquire_decoder_scratch(&mut self, chunk_len: usize) -> Vec<u8> {
459        if let Some(ref pool_inner) = self.pool {
460            let mut guard = pool_inner
461                .decoder_scratch
462                .lock()
463                .unwrap_or_else(|e| e.into_inner());
464            if let Some(mut b) = guard.pop() {
465                pool_inner
466                    .decoder_scratch_hits
467                    .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
468                b.clear();
469                b.resize(chunk_len, 0);
470                return b;
471            }
472            pool_inner
473                .decoder_scratch_allocs
474                .fetch_add(1, std::sync::atomic::Ordering::Relaxed);
475        }
476        vec![0u8; chunk_len]
477    }
478
479    /// Return a scratch buffer to the pool (no-op if no pool is active).
480    fn release_decoder_scratch(&self, mut buf: Vec<u8>) {
481        if let Some(ref pool_inner) = self.pool {
482            buf.clear();
483            let mut guard = pool_inner
484                .decoder_scratch
485                .lock()
486                .unwrap_or_else(|e| e.into_inner());
487            if guard.len() < pool_inner.cap {
488                guard.push(buf);
489            }
490        }
491    }
492
493    /// Read and decompress a compressed data chunk.
494    fn read_compressed_chunk(&mut self, chunk_len: usize) -> io::Result<()> {
495        if chunk_len < 4 {
496            return Err(io::Error::new(
497                io::ErrorKind::InvalidData,
498                "compressed chunk too short for checksum",
499            ));
500        }
501
502        let mut chunk_data = self.acquire_decoder_scratch(chunk_len);
503        self.inner.read_exact(&mut chunk_data)?;
504
505        // First 4 bytes are the masked CRC32C
506        let expected_checksum =
507            u32::from_le_bytes([chunk_data[0], chunk_data[1], chunk_data[2], chunk_data[3]]);
508
509        let compressed_data = &chunk_data[4..];
510        let decompressed = decompress::decompress(compressed_data)
511            .map_err(|e| io::Error::new(io::ErrorKind::InvalidData, e.to_string()))?;
512
513        // Return scratch buffer to pool before we replace output_buffer.
514        self.release_decoder_scratch(chunk_data);
515
516        // Verify checksum
517        let computed_checksum = masked_crc32c(&decompressed);
518        if expected_checksum != computed_checksum {
519            return Err(io::Error::new(
520                io::ErrorKind::InvalidData,
521                SnappyError::ChecksumMismatch {
522                    expected: expected_checksum,
523                    computed: computed_checksum,
524                }
525                .to_string(),
526            ));
527        }
528
529        self.output_buffer = decompressed;
530        self.output_pos = 0;
531        Ok(())
532    }
533
534    /// Read an uncompressed data chunk.
535    fn read_uncompressed_chunk(&mut self, chunk_len: usize) -> io::Result<()> {
536        if chunk_len < 4 {
537            return Err(io::Error::new(
538                io::ErrorKind::InvalidData,
539                "uncompressed chunk too short for checksum",
540            ));
541        }
542
543        let mut chunk_data = self.acquire_decoder_scratch(chunk_len);
544        self.inner.read_exact(&mut chunk_data)?;
545
546        let expected_checksum =
547            u32::from_le_bytes([chunk_data[0], chunk_data[1], chunk_data[2], chunk_data[3]]);
548
549        let data_slice = chunk_data[4..].to_vec();
550
551        self.release_decoder_scratch(chunk_data);
552
553        let computed_checksum = masked_crc32c(&data_slice);
554        if expected_checksum != computed_checksum {
555            return Err(io::Error::new(
556                io::ErrorKind::InvalidData,
557                SnappyError::ChecksumMismatch {
558                    expected: expected_checksum,
559                    computed: computed_checksum,
560                }
561                .to_string(),
562            ));
563        }
564
565        self.output_buffer = data_slice;
566        self.output_pos = 0;
567        Ok(())
568    }
569
570    /// Read and validate a stream identifier chunk body.
571    fn read_stream_identifier_chunk(&mut self, chunk_len: usize) -> io::Result<()> {
572        if chunk_len != 6 {
573            return Err(io::Error::new(
574                io::ErrorKind::InvalidData,
575                "invalid stream identifier length",
576            ));
577        }
578
579        let mut body = [0u8; 6];
580        self.inner.read_exact(&mut body)?;
581
582        if body != STREAM_BODY {
583            return Err(io::Error::new(
584                io::ErrorKind::InvalidData,
585                SnappyError::InvalidStreamIdentifier.to_string(),
586            ));
587        }
588
589        Ok(())
590    }
591}
592
593impl<R: Read> Read for FrameDecoder<R> {
594    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
595        if buf.is_empty() {
596            return Ok(0);
597        }
598
599        // Validate stream header on first read
600        if !self.header_validated && !self.at_eof {
601            self.validate_header()?;
602        }
603
604        loop {
605            if self.at_eof {
606                return Ok(0);
607            }
608
609            // If there's data in the output buffer, return it
610            let available = self.output_buffer.len() - self.output_pos;
611            if available > 0 {
612                let to_copy = available.min(buf.len());
613                buf[..to_copy].copy_from_slice(
614                    &self.output_buffer[self.output_pos..self.output_pos + to_copy],
615                );
616                self.output_pos += to_copy;
617                return Ok(to_copy);
618            }
619
620            // Try to read the next chunk
621            if !self.read_next_chunk()? {
622                return Ok(0);
623            }
624        }
625    }
626}
627
628/// Write a chunk header (type byte + 3-byte little-endian length).
629fn write_chunk_header(writer: &mut impl Write, chunk_type: u8, data_len: usize) -> io::Result<()> {
630    let header = [
631        chunk_type,
632        (data_len & 0xFF) as u8,
633        ((data_len >> 8) & 0xFF) as u8,
634        ((data_len >> 16) & 0xFF) as u8,
635    ];
636    writer.write_all(&header)
637}
638
639/// Compress `input` using the Snappy framing format, reusing scratch buffers
640/// from `pool` to amortise per-chunk allocation costs.
641///
642/// Output is byte-for-byte compatible with the serial [`FrameEncoder`] and
643/// can be decoded by [`FrameDecoder`].
644///
645/// # Errors
646///
647/// Returns an [`io::Error`] if the internal write operations fail (in practice
648/// this only happens on allocation failures when writing to a `Vec`).
649pub fn compress_frame_pooled(input: &[u8], pool: &SnappyPool) -> io::Result<Vec<u8>> {
650    let mut output = Vec::new();
651    let mut encoder = FrameEncoder::with_pool(&mut output, pool);
652    encoder.write_all(input)?;
653    encoder.finish()?;
654    Ok(output)
655}
656
657/// Compress `input` using the Snappy framing format with a prefix dictionary.
658///
659/// Each data chunk is compressed via
660/// [`crate::compress::compress_block_with_dict`] so matches into `dict` can
661/// reduce the compressed size when the input shares substrings with the dict.
662///
663/// The output begins with a custom **OxiArc dictionary frame** chunk (chunk
664/// type `0xFE`) that embeds a CRC32C of `dict` and the dict length.
665/// [`decompress_frame_with_dict`] uses this header to detect mismatched
666/// dictionaries before attempting decompression.
667///
668/// # Format
669/// ```text
670/// [Snappy stream identifier, 10 bytes]
671/// [OxiArc dict-info chunk, chunk-type=0xFE]
672///   body: b"OXIAD" (5) | crc32c(dict) as LE u32 (4) | dict_len as LE u32 (4)
673/// [compressed data chunks, each chunk-compressed with dict]
674/// ```
675///
676/// **This format is NOT compatible with standard Snappy frame readers.**
677/// Use [`decompress_frame_with_dict`] to decode.
678///
679/// The maximum dictionary size is 64 KiB; if `dict` is longer only the last
680/// 64 KiB is used (mirroring the block-level encoder).
681pub fn compress_frame_with_dict(input: &[u8], dict: &[u8]) -> Vec<u8> {
682    // Clamp dict to the last 64 KiB.
683    let dict = if dict.len() > 65536 {
684        &dict[dict.len() - 65536..]
685    } else {
686        dict
687    };
688
689    let mut output = Vec::new();
690
691    // 1. Snappy stream identifier.
692    output.extend_from_slice(&STREAM_IDENTIFIER);
693
694    // 2. OxiArc dict-info skippable chunk.
695    //    body: b"OXIAD" (5) | crc32c(dict) LE u32 (4) | dict_len LE u32 (4) = 13 bytes.
696    let dict_crc = crc32c(dict);
697    let dict_len_u32 = dict.len() as u32;
698    let mut dict_body = Vec::with_capacity(13);
699    dict_body.extend_from_slice(OXIARC_DICT_MAGIC);
700    dict_body.extend_from_slice(&dict_crc.to_le_bytes());
701    dict_body.extend_from_slice(&dict_len_u32.to_le_bytes());
702
703    // Chunk header: [chunk_type (1)] [body_len (3 bytes LE)]
704    let body_len = dict_body.len();
705    output.push(CHUNK_TYPE_OXIARC_DICT);
706    output.push((body_len & 0xFF) as u8);
707    output.push(((body_len >> 8) & 0xFF) as u8);
708    output.push(((body_len >> 16) & 0xFF) as u8);
709    output.extend_from_slice(&dict_body);
710
711    // 3. Data chunks (standard framing but block-compressed with dict).
712    let mut src_pos = 0usize;
713    while src_pos < input.len() {
714        let chunk_end = (src_pos + MAX_UNCOMPRESSED_CHUNK_SIZE).min(input.len());
715        let chunk_data = &input[src_pos..chunk_end];
716
717        let checksum = masked_crc32c(chunk_data);
718        let compressed = compress::compress_block_with_dict(chunk_data, dict);
719
720        if compressed.len() < chunk_data.len() {
721            // Compressed chunk.
722            let chunk_len = 4 + compressed.len();
723            write_chunk_header_vec(&mut output, CHUNK_TYPE_COMPRESSED, chunk_len);
724            output.extend_from_slice(&checksum.to_le_bytes());
725            output.extend_from_slice(&compressed);
726        } else {
727            // Uncompressed chunk (compression didn't help).
728            let chunk_len = 4 + chunk_data.len();
729            write_chunk_header_vec(&mut output, CHUNK_TYPE_UNCOMPRESSED, chunk_len);
730            output.extend_from_slice(&checksum.to_le_bytes());
731            output.extend_from_slice(chunk_data);
732        }
733
734        src_pos = chunk_end;
735    }
736
737    output
738}
739
740/// Decompress data produced by [`compress_frame_with_dict`].
741///
742/// The `dict` must be identical to the one used during compression.  The
743/// OxiArc dict-info chunk embedded in the frame is validated: if the CRC32C
744/// of the supplied dict does not match the stored CRC, an `InvalidData` error
745/// is returned before any decompression is attempted.
746///
747/// **This function only processes frames produced by [`compress_frame_with_dict`].**
748/// Standard Snappy frames (without the `0xFE` dict chunk) will be rejected.
749///
750/// # Errors
751/// Returns an error if the data is malformed, truncated, or the wrong dict is supplied.
752pub fn decompress_frame_with_dict(input: &[u8], dict: &[u8]) -> Result<Vec<u8>, SnappyError> {
753    // Clamp dict to the last 64 KiB.
754    let dict = if dict.len() > 65536 {
755        &dict[dict.len() - 65536..]
756    } else {
757        dict
758    };
759
760    let mut pos = 0usize;
761
762    // 1. Read and validate the stream identifier.
763    if pos + 10 > input.len() {
764        return Err(SnappyError::UnexpectedEof {
765            context: "stream identifier",
766        });
767    }
768    if input[pos..pos + 10] != STREAM_IDENTIFIER[..] {
769        return Err(SnappyError::InvalidStreamIdentifier);
770    }
771    pos += 10;
772
773    // 2. Read and validate the OxiArc dict-info chunk.
774    if pos + 4 > input.len() {
775        return Err(SnappyError::UnexpectedEof {
776            context: "dict-info chunk header",
777        });
778    }
779    let dict_chunk_type = input[pos];
780    let dict_chunk_body_len = (input[pos + 1] as usize)
781        | ((input[pos + 2] as usize) << 8)
782        | ((input[pos + 3] as usize) << 16);
783    pos += 4;
784
785    if dict_chunk_type != CHUNK_TYPE_OXIARC_DICT {
786        return Err(SnappyError::CorruptedData {
787            message: format!(
788                "expected OxiArc dict-info chunk (0xFE), found {dict_chunk_type:#04x}"
789            ),
790        });
791    }
792
793    if dict_chunk_body_len < 13 {
794        return Err(SnappyError::CorruptedData {
795            message: format!("OxiArc dict-info chunk body too short: {dict_chunk_body_len} bytes"),
796        });
797    }
798
799    if pos + dict_chunk_body_len > input.len() {
800        return Err(SnappyError::UnexpectedEof {
801            context: "dict-info chunk body",
802        });
803    }
804
805    let dict_body = &input[pos..pos + dict_chunk_body_len];
806    pos += dict_chunk_body_len;
807
808    // Validate magic.
809    if &dict_body[..5] != OXIARC_DICT_MAGIC {
810        return Err(SnappyError::CorruptedData {
811            message: "OxiArc dict-info magic mismatch".to_string(),
812        });
813    }
814
815    let stored_crc = u32::from_le_bytes([dict_body[5], dict_body[6], dict_body[7], dict_body[8]]);
816    let stored_len =
817        u32::from_le_bytes([dict_body[9], dict_body[10], dict_body[11], dict_body[12]]) as usize;
818
819    // Validate dict CRC and length.
820    let computed_crc = crc32c(dict);
821    if computed_crc != stored_crc {
822        return Err(SnappyError::ChecksumMismatch {
823            expected: stored_crc,
824            computed: computed_crc,
825        });
826    }
827    if dict.len() != stored_len {
828        return Err(SnappyError::CorruptedData {
829            message: format!(
830                "dict length mismatch: frame has {stored_len} bytes, supplied dict is {} bytes",
831                dict.len()
832            ),
833        });
834    }
835
836    // 3. Decode data chunks.
837    let mut output = Vec::new();
838
839    while pos < input.len() {
840        if pos + 4 > input.len() {
841            return Err(SnappyError::UnexpectedEof {
842                context: "chunk header",
843            });
844        }
845        let chunk_type = input[pos];
846        let chunk_body_len = (input[pos + 1] as usize)
847            | ((input[pos + 2] as usize) << 8)
848            | ((input[pos + 3] as usize) << 16);
849        pos += 4;
850
851        if pos + chunk_body_len > input.len() {
852            return Err(SnappyError::UnexpectedEof {
853                context: "chunk body",
854            });
855        }
856
857        let chunk_body = &input[pos..pos + chunk_body_len];
858        pos += chunk_body_len;
859
860        match chunk_type {
861            CHUNK_TYPE_COMPRESSED => {
862                if chunk_body.len() < 4 {
863                    return Err(SnappyError::CorruptedData {
864                        message: "compressed chunk too short for checksum".to_string(),
865                    });
866                }
867                let expected_checksum = u32::from_le_bytes([
868                    chunk_body[0],
869                    chunk_body[1],
870                    chunk_body[2],
871                    chunk_body[3],
872                ]);
873                let compressed_payload = &chunk_body[4..];
874                let decompressed =
875                    decompress::decompress_block_with_dict(compressed_payload, dict)?;
876
877                let computed_checksum = masked_crc32c(&decompressed);
878                if expected_checksum != computed_checksum {
879                    return Err(SnappyError::ChecksumMismatch {
880                        expected: expected_checksum,
881                        computed: computed_checksum,
882                    });
883                }
884                output.extend_from_slice(&decompressed);
885            }
886            CHUNK_TYPE_UNCOMPRESSED => {
887                if chunk_body.len() < 4 {
888                    return Err(SnappyError::CorruptedData {
889                        message: "uncompressed chunk too short for checksum".to_string(),
890                    });
891                }
892                let expected_checksum = u32::from_le_bytes([
893                    chunk_body[0],
894                    chunk_body[1],
895                    chunk_body[2],
896                    chunk_body[3],
897                ]);
898                let raw_data = &chunk_body[4..];
899
900                let computed_checksum = masked_crc32c(raw_data);
901                if expected_checksum != computed_checksum {
902                    return Err(SnappyError::ChecksumMismatch {
903                        expected: expected_checksum,
904                        computed: computed_checksum,
905                    });
906                }
907                output.extend_from_slice(raw_data);
908            }
909            CHUNK_TYPE_STREAM_ID => {
910                // Ignore any additional stream identifiers.
911            }
912            0x02..=0x7F => {
913                return Err(SnappyError::InvalidChunkType { chunk_type });
914            }
915            _ => {
916                // Other skippable chunks (including 0xFE if seen again) — skip.
917            }
918        }
919    }
920
921    Ok(output)
922}
923
924/// Write a chunk header to a plain `Vec<u8>` (infallible version used by
925/// the non-streaming dict-frame helpers).
926fn write_chunk_header_vec(output: &mut Vec<u8>, chunk_type: u8, data_len: usize) {
927    output.push(chunk_type);
928    output.push((data_len & 0xFF) as u8);
929    output.push(((data_len >> 8) & 0xFF) as u8);
930    output.push(((data_len >> 16) & 0xFF) as u8);
931}
932
933#[cfg(test)]
934mod tests {
935    use super::*;
936
937    #[test]
938    fn test_frame_roundtrip_small() {
939        let data = b"Hello, World! This is a test of Snappy framing.";
940
941        let mut compressed = Vec::new();
942        {
943            let mut encoder = FrameEncoder::new(&mut compressed);
944            encoder.write_all(data).expect("write should succeed");
945            encoder.finish().expect("finish should succeed");
946        }
947
948        // Verify stream identifier is present
949        assert_eq!(&compressed[..10], &STREAM_IDENTIFIER);
950
951        let mut decoder = FrameDecoder::new(&compressed[..]);
952        let mut output = Vec::new();
953        decoder
954            .read_to_end(&mut output)
955            .expect("read should succeed");
956
957        assert_eq!(output, data);
958    }
959
960    #[test]
961    fn test_frame_roundtrip_empty() {
962        let data = b"";
963
964        let mut compressed = Vec::new();
965        {
966            let mut encoder = FrameEncoder::new(&mut compressed);
967            encoder.write_all(data).expect("write should succeed");
968            encoder.finish().expect("finish should succeed");
969        }
970
971        let mut decoder = FrameDecoder::new(&compressed[..]);
972        let mut output = Vec::new();
973        decoder
974            .read_to_end(&mut output)
975            .expect("read should succeed");
976
977        assert_eq!(output, data);
978    }
979
980    #[test]
981    fn test_frame_roundtrip_large() {
982        // Data larger than one chunk (> 64 KiB)
983        let mut data = Vec::with_capacity(100_000);
984        for i in 0..100_000u32 {
985            data.push((i % 256) as u8);
986        }
987
988        let mut compressed = Vec::new();
989        {
990            let mut encoder = FrameEncoder::new(&mut compressed);
991            encoder.write_all(&data).expect("write should succeed");
992            encoder.finish().expect("finish should succeed");
993        }
994
995        let mut decoder = FrameDecoder::new(&compressed[..]);
996        let mut output = Vec::new();
997        decoder
998            .read_to_end(&mut output)
999            .expect("read should succeed");
1000
1001        assert_eq!(output, data);
1002    }
1003
1004    #[test]
1005    fn test_frame_roundtrip_repeated() {
1006        let data = vec![0xAB; 200_000];
1007
1008        let mut compressed = Vec::new();
1009        {
1010            let mut encoder = FrameEncoder::new(&mut compressed);
1011            encoder.write_all(&data).expect("write should succeed");
1012            encoder.finish().expect("finish should succeed");
1013        }
1014
1015        // Highly repeated data should compress well
1016        assert!(compressed.len() < data.len());
1017
1018        let mut decoder = FrameDecoder::new(&compressed[..]);
1019        let mut output = Vec::new();
1020        decoder
1021            .read_to_end(&mut output)
1022            .expect("read should succeed");
1023
1024        assert_eq!(output, data);
1025    }
1026
1027    #[test]
1028    fn test_frame_incremental_write() {
1029        let data = b"Hello, this is a test of incremental writing to the encoder.";
1030
1031        let mut compressed = Vec::new();
1032        {
1033            let mut encoder = FrameEncoder::new(&mut compressed);
1034            // Write in small increments
1035            for chunk in data.chunks(5) {
1036                encoder.write_all(chunk).expect("write should succeed");
1037            }
1038            encoder.finish().expect("finish should succeed");
1039        }
1040
1041        let mut decoder = FrameDecoder::new(&compressed[..]);
1042        let mut output = Vec::new();
1043        decoder
1044            .read_to_end(&mut output)
1045            .expect("read should succeed");
1046
1047        assert_eq!(output, data);
1048    }
1049
1050    #[test]
1051    fn test_frame_incremental_read() {
1052        let data = b"Test data for incremental reading from the decoder.";
1053
1054        let mut compressed = Vec::new();
1055        {
1056            let mut encoder = FrameEncoder::new(&mut compressed);
1057            encoder.write_all(data).expect("write should succeed");
1058            encoder.finish().expect("finish should succeed");
1059        }
1060
1061        let mut decoder = FrameDecoder::new(&compressed[..]);
1062        let mut output = Vec::new();
1063        let mut buf = [0u8; 7]; // Read in small chunks
1064        loop {
1065            let n = decoder.read(&mut buf).expect("read should succeed");
1066            if n == 0 {
1067                break;
1068            }
1069            output.extend_from_slice(&buf[..n]);
1070        }
1071
1072        assert_eq!(output, data);
1073    }
1074
1075    #[test]
1076    fn test_frame_decoder_invalid_header() {
1077        let bad_data = [0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x06, 0x07, 0x08, 0x09];
1078        let mut decoder = FrameDecoder::new(&bad_data[..]);
1079        let mut output = Vec::new();
1080        let result = decoder.read_to_end(&mut output);
1081        assert!(result.is_err());
1082    }
1083
1084    #[test]
1085    fn test_frame_decoder_empty_input() {
1086        let empty: &[u8] = &[];
1087        let mut decoder = FrameDecoder::new(empty);
1088        let mut output = Vec::new();
1089        decoder
1090            .read_to_end(&mut output)
1091            .expect("empty input should succeed");
1092        assert!(output.is_empty());
1093    }
1094
1095    #[test]
1096    fn test_write_chunk_header() {
1097        let mut buf = Vec::new();
1098        write_chunk_header(&mut buf, 0x00, 0x123456).expect("should succeed");
1099        assert_eq!(buf, vec![0x00, 0x56, 0x34, 0x12]);
1100    }
1101
1102    #[test]
1103    fn test_stream_identifier_constant() {
1104        // Verify the stream identifier matches the spec
1105        assert_eq!(STREAM_IDENTIFIER[0], 0xFF); // chunk type
1106        assert_eq!(STREAM_IDENTIFIER[1], 0x06); // length low
1107        assert_eq!(STREAM_IDENTIFIER[2], 0x00); // length mid
1108        assert_eq!(STREAM_IDENTIFIER[3], 0x00); // length high
1109        assert_eq!(&STREAM_IDENTIFIER[4..], b"sNaPpY");
1110    }
1111
1112    // -----------------------------------------------------------------------
1113    // Progress-callback tests
1114    // -----------------------------------------------------------------------
1115
1116    use oxiarc_core::cancel::CancellationToken;
1117    use oxiarc_core::progress::{ProgressHandle, ProgressSink};
1118    use std::sync::{
1119        Arc,
1120        atomic::{AtomicUsize, Ordering},
1121    };
1122
1123    /// A `ProgressSink` that records call count and each `processed` value.
1124    struct CountingSink {
1125        calls: AtomicUsize,
1126    }
1127
1128    impl CountingSink {
1129        fn new() -> Self {
1130            Self {
1131                calls: AtomicUsize::new(0),
1132            }
1133        }
1134
1135        fn call_count(&self) -> usize {
1136            self.calls.load(Ordering::SeqCst)
1137        }
1138    }
1139
1140    impl ProgressSink for CountingSink {
1141        fn on_progress(&self, _processed: u64, _total: Option<u64>) {
1142            self.calls.fetch_add(1, Ordering::SeqCst);
1143        }
1144    }
1145
1146    /// A second counting sink that records all processed values for
1147    /// strict monotonicity verification.
1148    struct MonotonicSink {
1149        values: std::sync::Mutex<Vec<u64>>,
1150    }
1151
1152    impl MonotonicSink {
1153        fn new() -> Self {
1154            Self {
1155                values: std::sync::Mutex::new(Vec::new()),
1156            }
1157        }
1158
1159        fn values(&self) -> Vec<u64> {
1160            self.values
1161                .lock()
1162                .unwrap_or_else(|p| p.into_inner())
1163                .clone()
1164        }
1165    }
1166
1167    impl ProgressSink for MonotonicSink {
1168        fn on_progress(&self, processed: u64, _total: Option<u64>) {
1169            let mut guard = self.values.lock().unwrap_or_else(|p| p.into_inner());
1170            guard.push(processed);
1171        }
1172    }
1173
1174    /// Encode + decode a 128 KiB buffer and verify progress callbacks.
1175    ///
1176    /// A 128 KiB input produces exactly 2 chunks (each 64 KiB), so
1177    /// `on_progress` must be called ≥ 2 times for both encode and decode.
1178    /// Decode progress values must be strictly non-decreasing.
1179    #[test]
1180    fn test_progress_counting_sink() {
1181        // 128 KiB of pseudo-random-ish data (prevent too-aggressive compression
1182        // flattening chunk boundaries).
1183        let data: Vec<u8> = (0..131_072u64)
1184            .map(|i| (i.wrapping_mul(6_364_136_223_846_793_005_u64) >> 56) as u8)
1185            .collect();
1186
1187        // --- Encode with progress ---
1188        let enc_sink = Arc::new(CountingSink::new());
1189        let enc_handle: ProgressHandle = enc_sink.clone();
1190
1191        let mut compressed = Vec::new();
1192        {
1193            let mut encoder = FrameEncoder::new(&mut compressed).with_progress(enc_handle);
1194            encoder
1195                .write_all(&data)
1196                .expect("encode write should succeed");
1197            encoder.finish().expect("encode finish should succeed");
1198        }
1199
1200        assert!(
1201            enc_sink.call_count() >= 2,
1202            "encoder on_progress called {} times, expected >= 2",
1203            enc_sink.call_count()
1204        );
1205
1206        // --- Decode with progress (monotonicity check) ---
1207        let dec_sink = Arc::new(MonotonicSink::new());
1208        let dec_handle: ProgressHandle = dec_sink.clone();
1209
1210        let mut decoder = FrameDecoder::new(&compressed[..]).with_progress(dec_handle);
1211        let mut output = Vec::new();
1212        decoder
1213            .read_to_end(&mut output)
1214            .expect("decode should succeed");
1215
1216        assert_eq!(output, data, "decoded data must match original");
1217
1218        let values = dec_sink.values();
1219        assert!(
1220            values.len() >= 2,
1221            "decoder on_progress called {} times, expected >= 2",
1222            values.len()
1223        );
1224        // Verify monotonically non-decreasing
1225        for window in values.windows(2) {
1226            assert!(
1227                window[1] >= window[0],
1228                "progress values not monotonic: {} followed by {}",
1229                window[0],
1230                window[1]
1231            );
1232        }
1233    }
1234
1235    // -----------------------------------------------------------------------
1236    // Cancellation tests
1237    // -----------------------------------------------------------------------
1238
1239    /// A pre-cancelled token must prevent encoding a multi-chunk buffer.
1240    #[test]
1241    fn test_cancellation_encoder_pre_cancelled() {
1242        // 128 KiB buffer → 2 chunks, so cancellation must trigger.
1243        let data: Vec<u8> = vec![0xBEu8; 131_072];
1244
1245        let token = CancellationToken::new();
1246        token.cancel();
1247
1248        let mut compressed = Vec::new();
1249        let mut encoder = FrameEncoder::new(&mut compressed).with_cancel(token);
1250        // write_all may succeed (it only buffers), but finish/flush must fail.
1251        let write_result = encoder.write_all(&data);
1252        let finish_result = encoder.finish();
1253
1254        // At least one of write or finish must be Err.
1255        let triggered = write_result.is_err() || finish_result.is_err();
1256        assert!(
1257            triggered,
1258            "expected a cancellation error but neither write nor finish returned Err"
1259        );
1260    }
1261
1262    /// A pre-cancelled token must prevent decoding.
1263    #[test]
1264    fn test_cancellation_decoder_pre_cancelled() {
1265        // First encode without cancellation.
1266        let data: Vec<u8> = vec![0xCAu8; 131_072];
1267        let mut compressed = Vec::new();
1268        {
1269            let mut encoder = FrameEncoder::new(&mut compressed);
1270            encoder
1271                .write_all(&data)
1272                .expect("encode write should succeed");
1273            encoder.finish().expect("encode finish should succeed");
1274        }
1275
1276        let token = CancellationToken::new();
1277        token.cancel();
1278
1279        let mut decoder = FrameDecoder::new(&compressed[..]).with_cancel(token);
1280        let mut output = Vec::new();
1281        let result = decoder.read_to_end(&mut output);
1282        assert!(result.is_err(), "expected cancellation error from decoder");
1283    }
1284
1285    // -----------------------------------------------------------------------
1286    // Edge-case tests: max-size blocks and boundary conditions
1287    // -----------------------------------------------------------------------
1288
1289    /// Compress exactly MAX_UNCOMPRESSED_CHUNK_SIZE (65536) bytes using
1290    /// FrameEncoder, verify it decompresses correctly via FrameDecoder.
1291    #[test]
1292    fn test_frame_max_size_chunk() {
1293        let data: Vec<u8> = (0..MAX_UNCOMPRESSED_CHUNK_SIZE)
1294            .map(|i| (i % 251) as u8)
1295            .collect();
1296        assert_eq!(data.len(), MAX_UNCOMPRESSED_CHUNK_SIZE);
1297
1298        let mut compressed = Vec::new();
1299        {
1300            let mut encoder = FrameEncoder::new(&mut compressed);
1301            encoder.write_all(&data).expect("write should succeed");
1302            encoder.finish().expect("finish should succeed");
1303        }
1304
1305        let mut decoder = FrameDecoder::new(&compressed[..]);
1306        let mut output = Vec::new();
1307        decoder
1308            .read_to_end(&mut output)
1309            .expect("read should succeed");
1310
1311        assert_eq!(output, data, "max-size chunk roundtrip failed");
1312    }
1313
1314    /// Compress MAX_UNCOMPRESSED_CHUNK_SIZE + 1 (65537) bytes; verify that
1315    /// exactly two compressed data chunks are present in the output, and that
1316    /// the full roundtrip is correct.
1317    ///
1318    /// The framing spec splits input at exactly 65536-byte boundaries, so
1319    /// 65537 bytes → chunk of 65536 + chunk of 1.
1320    #[test]
1321    fn test_frame_just_over_max_chunk() {
1322        let size = MAX_UNCOMPRESSED_CHUNK_SIZE + 1;
1323        let data: Vec<u8> = (0..size).map(|i| (i % 253) as u8).collect();
1324
1325        let mut compressed = Vec::new();
1326        {
1327            let mut encoder = FrameEncoder::new(&mut compressed);
1328            encoder.write_all(&data).expect("write should succeed");
1329            encoder.finish().expect("finish should succeed");
1330        }
1331
1332        // Count data chunks (CHUNK_TYPE_COMPRESSED = 0x00 or CHUNK_TYPE_UNCOMPRESSED = 0x01).
1333        // Skip the 10-byte stream identifier first.
1334        let payload = &compressed[10..];
1335        let mut data_chunk_count = 0usize;
1336        let mut pos = 0usize;
1337        while pos + 4 <= payload.len() {
1338            let chunk_type = payload[pos];
1339            let chunk_len = (payload[pos + 1] as usize)
1340                | ((payload[pos + 2] as usize) << 8)
1341                | ((payload[pos + 3] as usize) << 16);
1342            if chunk_type == CHUNK_TYPE_COMPRESSED || chunk_type == CHUNK_TYPE_UNCOMPRESSED {
1343                data_chunk_count += 1;
1344            }
1345            pos += 4 + chunk_len;
1346        }
1347
1348        assert_eq!(
1349            data_chunk_count, 2,
1350            "expected exactly 2 data chunks for 65537-byte input, got {data_chunk_count}"
1351        );
1352
1353        // Full roundtrip
1354        let mut decoder = FrameDecoder::new(&compressed[..]);
1355        let mut output = Vec::new();
1356        decoder
1357            .read_to_end(&mut output)
1358            .expect("read should succeed");
1359        assert_eq!(output, data, "just-over-max chunk roundtrip failed");
1360    }
1361
1362    /// `max_compress_len(65536)` must be at least 65536 bytes (worst-case
1363    /// incompressible data must still fit in the output).
1364    /// `max_compress_len(0)` must be at least 1 (varint overhead for length).
1365    #[test]
1366    fn test_block_max_compress_len() {
1367        use crate::compress::max_compress_len;
1368
1369        let max_len_65536 = max_compress_len(MAX_UNCOMPRESSED_CHUNK_SIZE);
1370        assert!(
1371            max_len_65536 >= MAX_UNCOMPRESSED_CHUNK_SIZE,
1372            "max_compress_len(65536) = {max_len_65536}, expected >= 65536"
1373        );
1374
1375        let max_len_0 = max_compress_len(0);
1376        assert!(
1377            max_len_0 >= 1,
1378            "max_compress_len(0) = {max_len_0}, expected >= 1"
1379        );
1380    }
1381
1382    /// Feed truncated compressed data to FrameDecoder::read_to_end; verify
1383    /// it returns an error and does not panic.
1384    #[test]
1385    fn test_decompress_truncated_frame() {
1386        // Encode valid data first.
1387        let data = vec![b'X'; 1000];
1388        let mut compressed = Vec::new();
1389        {
1390            let mut encoder = FrameEncoder::new(&mut compressed);
1391            encoder.write_all(&data).expect("write should succeed");
1392            encoder.finish().expect("finish should succeed");
1393        }
1394
1395        // Truncate to half the compressed length (but past the identifier).
1396        let truncated_len = compressed.len() / 2;
1397        let truncated = &compressed[..truncated_len];
1398
1399        let mut decoder = FrameDecoder::new(truncated);
1400        let mut output = Vec::new();
1401        let result = decoder.read_to_end(&mut output);
1402        // Must return an error (UnexpectedEof or InvalidData), never panic.
1403        assert!(
1404            result.is_err(),
1405            "expected error on truncated input, but read_to_end succeeded"
1406        );
1407    }
1408
1409    /// Flip one byte in the CRC field of the first compressed data chunk;
1410    /// verify that FrameDecoder returns a checksum error.
1411    #[test]
1412    fn test_decompress_corrupt_crc() {
1413        let data = vec![b'A'; 500];
1414        let mut compressed = Vec::new();
1415        {
1416            let mut encoder = FrameEncoder::new(&mut compressed);
1417            encoder.write_all(&data).expect("write should succeed");
1418            encoder.finish().expect("finish should succeed");
1419        }
1420
1421        // The stream layout: [10 bytes identifier][4 bytes chunk header][4 bytes CRC][…]
1422        // Flip the first byte of the CRC (byte index 14).
1423        let crc_offset = 14;
1424        assert!(
1425            crc_offset < compressed.len(),
1426            "compressed output is too short to contain a CRC field"
1427        );
1428        let mut corrupt = compressed.clone();
1429        corrupt[crc_offset] ^= 0xFF;
1430
1431        let mut decoder = FrameDecoder::new(&corrupt[..]);
1432        let mut output = Vec::new();
1433        let result = decoder.read_to_end(&mut output);
1434        assert!(
1435            result.is_err(),
1436            "expected checksum error on corrupt CRC, but read_to_end succeeded"
1437        );
1438    }
1439
1440    /// 65536 bytes of all zeros should compress to a much smaller output;
1441    /// roundtrip must be correct.
1442    #[test]
1443    fn test_compress_all_zeros() {
1444        let data = vec![0u8; MAX_UNCOMPRESSED_CHUNK_SIZE];
1445
1446        let mut compressed = Vec::new();
1447        {
1448            let mut encoder = FrameEncoder::new(&mut compressed);
1449            encoder.write_all(&data).expect("write should succeed");
1450            encoder.finish().expect("finish should succeed");
1451        }
1452
1453        // All-zero data should compress very well.
1454        assert!(
1455            compressed.len() < data.len() / 4,
1456            "expected compressed output much smaller than {}, got {}",
1457            data.len(),
1458            compressed.len()
1459        );
1460
1461        let mut decoder = FrameDecoder::new(&compressed[..]);
1462        let mut output = Vec::new();
1463        decoder
1464            .read_to_end(&mut output)
1465            .expect("read should succeed");
1466        assert_eq!(output, data, "all-zeros roundtrip failed");
1467    }
1468
1469    /// 65536 bytes of 0xFF; compression must be correct (roundtrip works),
1470    /// even if output is not smaller.
1471    #[test]
1472    fn test_compress_all_ones() {
1473        let data = vec![0xFFu8; MAX_UNCOMPRESSED_CHUNK_SIZE];
1474
1475        let mut compressed = Vec::new();
1476        {
1477            let mut encoder = FrameEncoder::new(&mut compressed);
1478            encoder.write_all(&data).expect("write should succeed");
1479            encoder.finish().expect("finish should succeed");
1480        }
1481
1482        let mut decoder = FrameDecoder::new(&compressed[..]);
1483        let mut output = Vec::new();
1484        decoder
1485            .read_to_end(&mut output)
1486            .expect("read should succeed");
1487        assert_eq!(output, data, "all-0xFF roundtrip failed");
1488    }
1489
1490    /// Feed a max-size compressed chunk byte-by-byte through a
1491    /// `std::io::BufReader`-backed FrameDecoder; the output must appear
1492    /// correctly (tests the incremental chunk-reading code path).
1493    #[test]
1494    fn test_frame_decoder_incremental_max_size() {
1495        use std::io::BufReader;
1496
1497        let data: Vec<u8> = (0..MAX_UNCOMPRESSED_CHUNK_SIZE)
1498            .map(|i| (i % 199) as u8)
1499            .collect();
1500
1501        let mut compressed = Vec::new();
1502        {
1503            let mut encoder = FrameEncoder::new(&mut compressed);
1504            encoder.write_all(&data).expect("write should succeed");
1505            encoder.finish().expect("finish should succeed");
1506        }
1507
1508        // Wrap the compressed slice in a BufReader with a tiny buffer (1 byte)
1509        // so that the decoder must make many small reads to reassemble each chunk.
1510        let buf_reader = BufReader::with_capacity(1, &compressed[..]);
1511        let mut decoder = FrameDecoder::new(buf_reader);
1512        let mut output = Vec::new();
1513        decoder
1514            .read_to_end(&mut output)
1515            .expect("incremental read should succeed");
1516
1517        assert_eq!(
1518            output, data,
1519            "incremental max-size chunk decoder roundtrip failed"
1520        );
1521    }
1522}