Skip to main content

structured_zstd/encoding/
frame_compressor.rs

1//! Utilities and interfaces for encoding an entire frame. Allows reusing resources
2
3use alloc::{boxed::Box, vec::Vec};
4use core::convert::TryInto;
5#[cfg(feature = "hash")]
6use twox_hash::XxHash64;
7
8#[cfg(feature = "hash")]
9use core::hash::Hasher;
10
11use super::{
12    CompressionLevel, Matcher, block_header::BlockHeader, frame_header::FrameHeader, levels::*,
13    match_generator::MatchGeneratorDriver,
14};
15use crate::fse::fse_encoder::{FSETable, default_ll_table, default_ml_table, default_of_table};
16
17use crate::io::{Read, Write};
18
19/// An interface for compressing arbitrary data with the ZStandard compression algorithm.
20///
21/// `FrameCompressor` will generally be used by:
22/// 1. Initializing a compressor by providing a buffer of data using `FrameCompressor::new()`
23/// 2. Starting compression and writing that compression into a vec using `FrameCompressor::begin`
24///
25/// # Examples
26/// ```
27/// use structured_zstd::encoding::{FrameCompressor, CompressionLevel};
28/// let mock_data: &[_] = &[0x1, 0x2, 0x3, 0x4];
29/// let mut output = std::vec::Vec::new();
30/// // Initialize a compressor.
31/// let mut compressor = FrameCompressor::new(CompressionLevel::Uncompressed);
32/// compressor.set_source(mock_data);
33/// compressor.set_drain(&mut output);
34///
35/// // `compress` writes the compressed output into the provided buffer.
36/// compressor.compress();
37/// ```
38pub struct FrameCompressor<R: Read, W: Write, M: Matcher> {
39    uncompressed_data: Option<R>,
40    compressed_data: Option<W>,
41    compression_level: CompressionLevel,
42    dictionary: Option<crate::decoding::Dictionary>,
43    dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
44    source_size_hint: Option<u64>,
45    state: CompressState<M>,
46    #[cfg(feature = "hash")]
47    hasher: XxHash64,
48}
49
50#[derive(Clone, Default)]
51struct CachedDictionaryEntropy {
52    huff: Option<crate::huff0::huff0_encoder::HuffmanTable>,
53    ll_previous: Option<PreviousFseTable>,
54    ml_previous: Option<PreviousFseTable>,
55    of_previous: Option<PreviousFseTable>,
56}
57
58#[derive(Clone)]
59pub(crate) enum PreviousFseTable {
60    // Default tables are immutable and already stored alongside the state, so
61    // repeating them only needs a lightweight marker instead of cloning FSETable.
62    Default,
63    Custom(Box<FSETable>),
64}
65
66impl PreviousFseTable {
67    pub(crate) fn as_table<'a>(&'a self, default: &'a FSETable) -> &'a FSETable {
68        match self {
69            Self::Default => default,
70            Self::Custom(table) => table,
71        }
72    }
73}
74
75pub(crate) struct FseTables {
76    pub(crate) ll_default: FSETable,
77    pub(crate) ll_previous: Option<PreviousFseTable>,
78    pub(crate) ml_default: FSETable,
79    pub(crate) ml_previous: Option<PreviousFseTable>,
80    pub(crate) of_default: FSETable,
81    pub(crate) of_previous: Option<PreviousFseTable>,
82}
83
84impl FseTables {
85    pub fn new() -> Self {
86        Self {
87            ll_default: default_ll_table(),
88            ll_previous: None,
89            ml_default: default_ml_table(),
90            ml_previous: None,
91            of_default: default_of_table(),
92            of_previous: None,
93        }
94    }
95}
96
97pub(crate) struct CompressState<M: Matcher> {
98    pub(crate) matcher: M,
99    pub(crate) last_huff_table: Option<crate::huff0::huff0_encoder::HuffmanTable>,
100    pub(crate) fse_tables: FseTables,
101    /// Offset history for repeat offset encoding: [rep0, rep1, rep2].
102    /// Initialized to [1, 4, 8] per RFC 8878 ยง3.1.2.5.
103    pub(crate) offset_hist: [u32; 3],
104}
105
106impl<R: Read, W: Write> FrameCompressor<R, W, MatchGeneratorDriver> {
107    /// Create a new `FrameCompressor`
108    pub fn new(compression_level: CompressionLevel) -> Self {
109        Self {
110            uncompressed_data: None,
111            compressed_data: None,
112            compression_level,
113            dictionary: None,
114            dictionary_entropy_cache: None,
115            source_size_hint: None,
116            state: CompressState {
117                matcher: MatchGeneratorDriver::new(1024 * 128, 1),
118                last_huff_table: None,
119                fse_tables: FseTables::new(),
120                offset_hist: [1, 4, 8],
121            },
122            #[cfg(feature = "hash")]
123            hasher: XxHash64::with_seed(0),
124        }
125    }
126}
127
128impl<R: Read, W: Write, M: Matcher> FrameCompressor<R, W, M> {
129    /// Create a new `FrameCompressor` with a custom matching algorithm implementation
130    pub fn new_with_matcher(matcher: M, compression_level: CompressionLevel) -> Self {
131        Self {
132            uncompressed_data: None,
133            compressed_data: None,
134            dictionary: None,
135            dictionary_entropy_cache: None,
136            source_size_hint: None,
137            state: CompressState {
138                matcher,
139                last_huff_table: None,
140                fse_tables: FseTables::new(),
141                offset_hist: [1, 4, 8],
142            },
143            compression_level,
144            #[cfg(feature = "hash")]
145            hasher: XxHash64::with_seed(0),
146        }
147    }
148
149    /// Before calling [FrameCompressor::compress] you need to set the source.
150    ///
151    /// This is the data that is compressed and written into the drain.
152    pub fn set_source(&mut self, uncompressed_data: R) -> Option<R> {
153        self.uncompressed_data.replace(uncompressed_data)
154    }
155
156    /// Before calling [FrameCompressor::compress] you need to set the drain.
157    ///
158    /// As the compressor compresses data, the drain serves as a place for the output to be writte.
159    pub fn set_drain(&mut self, compressed_data: W) -> Option<W> {
160        self.compressed_data.replace(compressed_data)
161    }
162
163    /// Provide a hint about the total uncompressed size for the next frame.
164    ///
165    /// When set, the encoder selects smaller hash tables and windows for
166    /// small inputs, matching the C zstd source-size-class behavior.
167    ///
168    /// This hint applies only to frame payload bytes (`size`). Dictionary
169    /// history is primed separately and does not inflate the hinted size or
170    /// advertised frame window.
171    /// Must be called before [`compress`](Self::compress).
172    pub fn set_source_size_hint(&mut self, size: u64) {
173        self.source_size_hint = Some(size);
174    }
175
176    /// Compress the uncompressed data from the provided source as one Zstd frame and write it to the provided drain
177    ///
178    /// This will repeatedly call [Read::read] on the source to fill up blocks until the source returns 0 on the read call.
179    /// All compressed blocks are buffered in memory so that the frame header can include the
180    /// `Frame_Content_Size` field (which requires knowing the total uncompressed size). The
181    /// entire frame โ€” header, blocks, and optional checksum โ€” is then written to the drain
182    /// at the end. This means peak memory usage is O(compressed_size).
183    ///
184    /// To avoid endlessly encoding from a potentially endless source (like a network socket) you can use the
185    /// [Read::take] function
186    pub fn compress(&mut self) {
187        let small_source_hint = self.source_size_hint.map(|size| size <= (1 << 14));
188        let use_dictionary_state =
189            !matches!(self.compression_level, CompressionLevel::Uncompressed)
190                && self.state.matcher.supports_dictionary_priming()
191                && self.dictionary.is_some();
192        if let Some(size_hint) = self.source_size_hint.take() {
193            // Keep source-size hint scoped to payload bytes; dictionary priming
194            // is applied separately and should not force larger matcher sizing.
195            self.state.matcher.set_source_size_hint(size_hint);
196        }
197        // Clearing buffers to allow re-using of the compressor
198        self.state.matcher.reset(self.compression_level);
199        self.state.offset_hist = [1, 4, 8];
200        let cached_entropy = if use_dictionary_state {
201            self.dictionary_entropy_cache.as_ref()
202        } else {
203            None
204        };
205        if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
206            // This state drives sequence encoding, while matcher priming below updates
207            // the match generator's internal repeat-offset history for match finding.
208            self.state.offset_hist = dict.offset_hist;
209            self.state
210                .matcher
211                .prime_with_dictionary(dict.dict_content.as_slice(), dict.offset_hist);
212        }
213        if let Some(cache) = cached_entropy {
214            self.state.last_huff_table.clone_from(&cache.huff);
215        } else {
216            self.state.last_huff_table = None;
217        }
218        // `clone_from` keeps frame-to-frame seeding cheap for reused compressors by
219        // reusing existing allocations where possible instead of reallocating every frame.
220        if let Some(cache) = cached_entropy {
221            self.state
222                .fse_tables
223                .ll_previous
224                .clone_from(&cache.ll_previous);
225            self.state
226                .fse_tables
227                .ml_previous
228                .clone_from(&cache.ml_previous);
229            self.state
230                .fse_tables
231                .of_previous
232                .clone_from(&cache.of_previous);
233        } else {
234            self.state.fse_tables.ll_previous = None;
235            self.state.fse_tables.ml_previous = None;
236            self.state.fse_tables.of_previous = None;
237        }
238        #[cfg(feature = "hash")]
239        {
240            self.hasher = XxHash64::with_seed(0);
241        }
242        let source = self.uncompressed_data.as_mut().unwrap();
243        let drain = self.compressed_data.as_mut().unwrap();
244        let window_size = self.state.matcher.window_size();
245        assert!(
246            window_size != 0,
247            "matcher reported window_size == 0, which is invalid"
248        );
249        // Accumulate all compressed blocks; the frame header is written after
250        // all input has been read so that Frame_Content_Size is known.
251        let mut all_blocks: Vec<u8> = Vec::with_capacity(1024 * 130);
252        let mut total_uncompressed: u64 = 0;
253        // Compress block by block
254        loop {
255            // Read a single block's worth of uncompressed data from the input
256            let mut uncompressed_data = self.state.matcher.get_next_space();
257            let mut read_bytes = 0;
258            let last_block;
259            'read_loop: loop {
260                let new_bytes = source.read(&mut uncompressed_data[read_bytes..]).unwrap();
261                if new_bytes == 0 {
262                    last_block = true;
263                    break 'read_loop;
264                }
265                read_bytes += new_bytes;
266                if read_bytes == uncompressed_data.len() {
267                    last_block = false;
268                    break 'read_loop;
269                }
270            }
271            uncompressed_data.resize(read_bytes, 0);
272            total_uncompressed += read_bytes as u64;
273            // As we read, hash that data too
274            #[cfg(feature = "hash")]
275            self.hasher.write(&uncompressed_data);
276            // Special handling is needed for compression of a totally empty file
277            if uncompressed_data.is_empty() {
278                let header = BlockHeader {
279                    last_block: true,
280                    block_type: crate::blocks::block::BlockType::Raw,
281                    block_size: 0,
282                };
283                header.serialize(&mut all_blocks);
284                break;
285            }
286
287            match self.compression_level {
288                CompressionLevel::Uncompressed => {
289                    let header = BlockHeader {
290                        last_block,
291                        block_type: crate::blocks::block::BlockType::Raw,
292                        block_size: read_bytes.try_into().unwrap(),
293                    };
294                    header.serialize(&mut all_blocks);
295                    all_blocks.extend_from_slice(&uncompressed_data);
296                }
297                CompressionLevel::Fastest
298                | CompressionLevel::Default
299                | CompressionLevel::Better
300                | CompressionLevel::Best
301                | CompressionLevel::Level(_) => {
302                    compress_block_encoded(
303                        &mut self.state,
304                        self.compression_level,
305                        last_block,
306                        uncompressed_data,
307                        &mut all_blocks,
308                    );
309                }
310            }
311            if last_block {
312                break;
313            }
314        }
315
316        // Now that total_uncompressed is known, write the frame header with FCS.
317        // Keep hinted tiny one-shot frames in single-segment mode to match the
318        // donor framing policy used by the FFI path across levels.
319        // Guard out sub-512 byte payloads for now: tiny compressed-path
320        // single-segment framing is not yet fully C-FFI compatible.
321        let single_segment = !use_dictionary_state
322            && small_source_hint == Some(true)
323            && (512..=(1 << 14)).contains(&total_uncompressed);
324        let header = FrameHeader {
325            frame_content_size: Some(total_uncompressed),
326            single_segment,
327            content_checksum: cfg!(feature = "hash"),
328            dictionary_id: if use_dictionary_state {
329                self.dictionary.as_ref().map(|dict| dict.id as u64)
330            } else {
331                None
332            },
333            window_size: if single_segment {
334                None
335            } else {
336                Some(window_size)
337            },
338        };
339        // Write the frame header and compressed blocks separately to avoid
340        // shifting the entire `all_blocks` buffer to prepend the header.
341        let mut header_buf: Vec<u8> = Vec::with_capacity(14);
342        header.serialize(&mut header_buf);
343        drain.write_all(&header_buf).unwrap();
344        drain.write_all(&all_blocks).unwrap();
345
346        // If the `hash` feature is enabled, then `content_checksum` is set to true in the header
347        // and a 32 bit hash is written at the end of the data.
348        #[cfg(feature = "hash")]
349        {
350            // Because we only have the data as a reader, we need to read all of it to calculate the checksum
351            // Possible TODO: create a wrapper around self.uncompressed data that hashes the data as it's read?
352            let content_checksum = self.hasher.finish();
353            drain
354                .write_all(&(content_checksum as u32).to_le_bytes())
355                .unwrap();
356        }
357    }
358
359    /// Get a mutable reference to the source
360    pub fn source_mut(&mut self) -> Option<&mut R> {
361        self.uncompressed_data.as_mut()
362    }
363
364    /// Get a mutable reference to the drain
365    pub fn drain_mut(&mut self) -> Option<&mut W> {
366        self.compressed_data.as_mut()
367    }
368
369    /// Get a reference to the source
370    pub fn source(&self) -> Option<&R> {
371        self.uncompressed_data.as_ref()
372    }
373
374    /// Get a reference to the drain
375    pub fn drain(&self) -> Option<&W> {
376        self.compressed_data.as_ref()
377    }
378
379    /// Retrieve the source
380    pub fn take_source(&mut self) -> Option<R> {
381        self.uncompressed_data.take()
382    }
383
384    /// Retrieve the drain
385    pub fn take_drain(&mut self) -> Option<W> {
386        self.compressed_data.take()
387    }
388
389    /// Before calling [FrameCompressor::compress] you can replace the matcher
390    pub fn replace_matcher(&mut self, mut match_generator: M) -> M {
391        core::mem::swap(&mut match_generator, &mut self.state.matcher);
392        match_generator
393    }
394
395    /// Before calling [FrameCompressor::compress] you can replace the compression level
396    pub fn set_compression_level(
397        &mut self,
398        compression_level: CompressionLevel,
399    ) -> CompressionLevel {
400        let old = self.compression_level;
401        self.compression_level = compression_level;
402        old
403    }
404
405    /// Get the current compression level
406    pub fn compression_level(&self) -> CompressionLevel {
407        self.compression_level
408    }
409
410    /// Attach a pre-parsed dictionary to be used for subsequent compressions.
411    ///
412    /// In compressed modes, the dictionary id is written only when the active
413    /// matcher supports dictionary priming.
414    /// Uncompressed mode and non-priming matchers ignore the attached dictionary
415    /// at encode time.
416    pub fn set_dictionary(
417        &mut self,
418        dictionary: crate::decoding::Dictionary,
419    ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
420    {
421        if dictionary.id == 0 {
422            return Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId);
423        }
424        if let Some(index) = dictionary.offset_hist.iter().position(|&rep| rep == 0) {
425            return Err(
426                crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
427                    index: index as u8,
428                },
429            );
430        }
431        self.dictionary_entropy_cache = Some(CachedDictionaryEntropy {
432            huff: dictionary.huf.table.to_encoder_table(),
433            ll_previous: dictionary
434                .fse
435                .literal_lengths
436                .to_encoder_table()
437                .map(|table| PreviousFseTable::Custom(Box::new(table))),
438            ml_previous: dictionary
439                .fse
440                .match_lengths
441                .to_encoder_table()
442                .map(|table| PreviousFseTable::Custom(Box::new(table))),
443            of_previous: dictionary
444                .fse
445                .offsets
446                .to_encoder_table()
447                .map(|table| PreviousFseTable::Custom(Box::new(table))),
448        });
449        Ok(self.dictionary.replace(dictionary))
450    }
451
452    /// Parse and attach a serialized dictionary blob.
453    pub fn set_dictionary_from_bytes(
454        &mut self,
455        raw_dictionary: &[u8],
456    ) -> Result<Option<crate::decoding::Dictionary>, crate::decoding::errors::DictionaryDecodeError>
457    {
458        let dictionary = crate::decoding::Dictionary::decode_dict(raw_dictionary)?;
459        self.set_dictionary(dictionary)
460    }
461
462    /// Remove the attached dictionary.
463    pub fn clear_dictionary(&mut self) -> Option<crate::decoding::Dictionary> {
464        self.dictionary_entropy_cache = None;
465        self.dictionary.take()
466    }
467}
468
469#[cfg(test)]
470mod tests {
471    #[cfg(all(feature = "dict_builder", feature = "std"))]
472    use alloc::format;
473    use alloc::vec;
474
475    use super::FrameCompressor;
476    use crate::blocks::block::BlockType;
477    use crate::common::MAGIC_NUM;
478    use crate::decoding::{FrameDecoder, block_decoder, frame::read_frame_header};
479    use crate::encoding::{Matcher, Sequence};
480    use alloc::vec::Vec;
481
482    fn generate_data(seed: u64, len: usize) -> Vec<u8> {
483        let mut state = seed;
484        let mut data = Vec::with_capacity(len);
485        for _ in 0..len {
486            state = state
487                .wrapping_mul(6364136223846793005)
488                .wrapping_add(1442695040888963407);
489            data.push((state >> 33) as u8);
490        }
491        data
492    }
493
494    fn first_block_type(frame: &[u8]) -> BlockType {
495        let (_, header_size) = read_frame_header(frame).expect("frame header should parse");
496        let mut decoder = block_decoder::new();
497        let (header, _) = decoder
498            .read_block_header(&frame[header_size as usize..])
499            .expect("block header should parse");
500        header.block_type
501    }
502
503    /// Frame content size is written correctly and C zstd can decompress the output.
504    #[cfg(feature = "std")]
505    #[test]
506    fn fcs_header_written_and_c_zstd_compatible() {
507        let levels = [
508            crate::encoding::CompressionLevel::Uncompressed,
509            crate::encoding::CompressionLevel::Fastest,
510            crate::encoding::CompressionLevel::Default,
511            crate::encoding::CompressionLevel::Better,
512            crate::encoding::CompressionLevel::Best,
513        ];
514        let fcs_2byte = vec![0xCDu8; 300]; // 300 bytes โ†’ 2-byte FCS (256..=65791 range)
515        let large = vec![0xABu8; 100_000];
516        let inputs: [&[u8]; 5] = [
517            &[],
518            &[0x00],
519            b"abcdefghijklmnopqrstuvwxy\n",
520            &fcs_2byte,
521            &large,
522        ];
523        for level in levels {
524            for data in &inputs {
525                let compressed = crate::encoding::compress_to_vec(*data, level);
526                // Verify FCS is present and correct
527                let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
528                    .unwrap()
529                    .0;
530                assert_eq!(
531                    header.frame_content_size(),
532                    data.len() as u64,
533                    "FCS mismatch for len={} level={:?}",
534                    data.len(),
535                    level,
536                );
537                // Confirm the FCS field is actually present in the header
538                // (not just the decoder returning 0 for absent FCS).
539                assert_ne!(
540                    header.descriptor.frame_content_size_bytes().unwrap(),
541                    0,
542                    "FCS field must be present for len={} level={:?}",
543                    data.len(),
544                    level,
545                );
546                // Verify C zstd can decompress
547                let mut decoded = Vec::new();
548                zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
549                    |e| {
550                        panic!(
551                            "C zstd decode failed for len={} level={level:?}: {e}",
552                            data.len()
553                        )
554                    },
555                );
556                assert_eq!(
557                    decoded.as_slice(),
558                    *data,
559                    "C zstd roundtrip failed for len={}",
560                    data.len()
561                );
562            }
563        }
564    }
565
566    #[cfg(feature = "std")]
567    #[test]
568    fn source_size_hint_fastest_remains_ffi_compatible_small_input() {
569        let data = vec![0xAB; 2047];
570        let compressed = {
571            let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
572            compressor.set_source_size_hint(data.len() as u64);
573            compressor.set_source(data.as_slice());
574            let mut out = Vec::new();
575            compressor.set_drain(&mut out);
576            compressor.compress();
577            out
578        };
579
580        let mut decoded = Vec::new();
581        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
582        assert_eq!(decoded, data);
583    }
584
585    #[cfg(feature = "std")]
586    #[test]
587    fn small_hinted_default_frame_uses_single_segment_header() {
588        let data = generate_data(0xD15E_A5ED, 1024);
589        let compressed = {
590            let mut compressor = FrameCompressor::new(super::CompressionLevel::Default);
591            compressor.set_source_size_hint(data.len() as u64);
592            compressor.set_source(data.as_slice());
593            let mut out = Vec::new();
594            compressor.set_drain(&mut out);
595            compressor.compress();
596            out
597        };
598
599        let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
600        assert!(
601            frame_header.descriptor.single_segment_flag(),
602            "small hinted default frames should use single-segment header for Rust/FFI parity"
603        );
604        assert_eq!(frame_header.frame_content_size(), data.len() as u64);
605        let mut decoded = Vec::new();
606        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
607            .expect("ffi decoder must accept single-segment small hinted default frame");
608        assert_eq!(decoded, data);
609    }
610
611    #[cfg(feature = "std")]
612    #[test]
613    fn small_hinted_numeric_default_levels_use_single_segment_header() {
614        let data = generate_data(0xA11C_E003, 1024);
615        for level in [
616            super::CompressionLevel::Level(0),
617            super::CompressionLevel::Level(3),
618        ] {
619            let compressed = {
620                let mut compressor = FrameCompressor::new(level);
621                compressor.set_source_size_hint(data.len() as u64);
622                compressor.set_source(data.as_slice());
623                let mut out = Vec::new();
624                compressor.set_drain(&mut out);
625                compressor.compress();
626                out
627            };
628
629            let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
630            assert!(
631                frame_header.descriptor.single_segment_flag(),
632                "small hinted numeric default level frames should use single-segment header (level={level:?})"
633            );
634            assert_eq!(frame_header.frame_content_size(), data.len() as u64);
635            let mut decoded = Vec::new();
636            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
637                panic!(
638                    "ffi decoder must accept single-segment small hinted numeric default level frame (level={level:?}): {e}"
639                )
640            });
641            assert_eq!(decoded, data);
642        }
643    }
644
645    #[cfg(feature = "std")]
646    #[test]
647    fn source_size_hint_levels_remain_ffi_compatible_small_inputs_matrix() {
648        let levels = [
649            super::CompressionLevel::Fastest,
650            super::CompressionLevel::Default,
651            super::CompressionLevel::Better,
652            super::CompressionLevel::Best,
653            super::CompressionLevel::Level(-1),
654            super::CompressionLevel::Level(2),
655            super::CompressionLevel::Level(3),
656            super::CompressionLevel::Level(4),
657            super::CompressionLevel::Level(11),
658        ];
659        let sizes = [
660            511usize, 512, 513, 1023, 1024, 1536, 2047, 2048, 4095, 4096, 8191, 16_384, 16_385,
661        ];
662
663        for (seed_idx, seed) in [11u64, 23, 41].into_iter().enumerate() {
664            for &size in &sizes {
665                let data = generate_data(seed + seed_idx as u64, size);
666                for &level in &levels {
667                    let compressed = {
668                        let mut compressor = FrameCompressor::new(level);
669                        compressor.set_source_size_hint(data.len() as u64);
670                        compressor.set_source(data.as_slice());
671                        let mut out = Vec::new();
672                        compressor.set_drain(&mut out);
673                        compressor.compress();
674                        out
675                    };
676                    if matches!(size, 511 | 512) {
677                        let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
678                        assert_eq!(
679                            frame_header.descriptor.single_segment_flag(),
680                            size == 512,
681                            "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
682                        );
683                    }
684
685                    let mut decoded = Vec::new();
686                    zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
687                        |e| {
688                            panic!(
689                                "ffi decode failed with source-size hint: level={level:?} size={size} seed={} err={e}",
690                                seed + seed_idx as u64
691                            )
692                        },
693                    );
694                    assert_eq!(
695                        decoded,
696                        data,
697                        "hinted ffi roundtrip mismatch: level={level:?} size={size} seed={}",
698                        seed + seed_idx as u64
699                    );
700                }
701            }
702        }
703    }
704
705    #[cfg(feature = "std")]
706    #[test]
707    fn hinted_levels_use_single_segment_header_symmetrically() {
708        let levels = [
709            super::CompressionLevel::Fastest,
710            super::CompressionLevel::Default,
711            super::CompressionLevel::Better,
712            super::CompressionLevel::Best,
713            super::CompressionLevel::Level(0),
714            super::CompressionLevel::Level(2),
715            super::CompressionLevel::Level(3),
716            super::CompressionLevel::Level(4),
717            super::CompressionLevel::Level(11),
718        ];
719        for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
720            let size = 1024 + seed_idx * 97;
721            let data = generate_data(seed, size);
722            for &level in &levels {
723                let compressed = {
724                    let mut compressor = FrameCompressor::new(level);
725                    compressor.set_source_size_hint(data.len() as u64);
726                    compressor.set_source(data.as_slice());
727                    let mut out = Vec::new();
728                    compressor.set_drain(&mut out);
729                    compressor.compress();
730                    out
731                };
732                let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
733                assert!(
734                    frame_header.descriptor.single_segment_flag(),
735                    "hinted frame should be single-segment for level={level:?} size={}",
736                    data.len()
737                );
738                assert_eq!(frame_header.frame_content_size(), data.len() as u64);
739                let mut decoded = Vec::new();
740                zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(|e| {
741                    panic!(
742                        "ffi decode failed for hinted single-segment parity: level={level:?} size={} err={e}",
743                        data.len()
744                    )
745                });
746                assert_eq!(decoded, data);
747            }
748        }
749    }
750
751    #[cfg(feature = "std")]
752    #[test]
753    fn hinted_levels_pin_511_512_single_segment_boundary() {
754        let levels = [
755            super::CompressionLevel::Fastest,
756            super::CompressionLevel::Default,
757            super::CompressionLevel::Better,
758            super::CompressionLevel::Best,
759            super::CompressionLevel::Level(0),
760            super::CompressionLevel::Level(2),
761            super::CompressionLevel::Level(3),
762            super::CompressionLevel::Level(4),
763            super::CompressionLevel::Level(11),
764        ];
765        for (seed_idx, seed) in [7u64, 23, 41].into_iter().enumerate() {
766            for &size in &[511usize, 512] {
767                let data = generate_data(seed + seed_idx as u64, size);
768                for &level in &levels {
769                    let compressed = {
770                        let mut compressor = FrameCompressor::new(level);
771                        compressor.set_source_size_hint(data.len() as u64);
772                        compressor.set_source(data.as_slice());
773                        let mut out = Vec::new();
774                        compressor.set_drain(&mut out);
775                        compressor.compress();
776                        out
777                    };
778                    let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
779                    assert_eq!(
780                        frame_header.descriptor.single_segment_flag(),
781                        size == 512,
782                        "single_segment 511/512 boundary mismatch: level={level:?} size={size}"
783                    );
784                    let mut decoded = Vec::new();
785                    zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap_or_else(
786                        |e| {
787                            panic!(
788                                "ffi decode failed at single-segment boundary: level={level:?} size={size} seed={} err={e}",
789                                seed + seed_idx as u64
790                            )
791                        },
792                    );
793                    assert_eq!(decoded, data);
794                }
795            }
796        }
797    }
798
799    #[cfg(feature = "std")]
800    #[test]
801    fn fastest_random_block_uses_raw_fast_path() {
802        let data = generate_data(0xC0FF_EE11, 10 * 1024);
803        let compressed =
804            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Fastest);
805
806        assert_eq!(first_block_type(&compressed), BlockType::Raw);
807
808        let mut decoded = Vec::new();
809        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
810        assert_eq!(decoded, data);
811    }
812
813    #[cfg(feature = "std")]
814    #[test]
815    fn default_random_block_uses_raw_fast_path() {
816        let data = generate_data(0xD15E_A5ED, 10 * 1024);
817        let compressed =
818            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Default);
819
820        assert_eq!(first_block_type(&compressed), BlockType::Raw);
821
822        let mut decoded = Vec::new();
823        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
824        assert_eq!(decoded, data);
825    }
826
827    #[cfg(feature = "std")]
828    #[test]
829    fn best_random_block_uses_raw_fast_path() {
830        let data = generate_data(0xB35C_AFE1, 10 * 1024);
831        let compressed =
832            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Best);
833
834        assert_eq!(first_block_type(&compressed), BlockType::Raw);
835
836        let mut decoded = Vec::new();
837        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
838        assert_eq!(decoded, data);
839    }
840
841    #[cfg(feature = "std")]
842    #[test]
843    fn level2_random_block_uses_raw_fast_path() {
844        let data = generate_data(0xA11C_E222, 10 * 1024);
845        let compressed =
846            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Level(2));
847
848        assert_eq!(first_block_type(&compressed), BlockType::Raw);
849
850        let mut decoded = Vec::new();
851        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
852        assert_eq!(decoded, data);
853    }
854
855    #[cfg(feature = "std")]
856    #[test]
857    fn better_random_block_uses_raw_fast_path() {
858        let data = generate_data(0xBE77_E111, 10 * 1024);
859        let compressed =
860            crate::encoding::compress_to_vec(data.as_slice(), super::CompressionLevel::Better);
861
862        assert_eq!(first_block_type(&compressed), BlockType::Raw);
863
864        let mut decoded = Vec::new();
865        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
866        assert_eq!(decoded, data);
867    }
868
869    #[cfg(feature = "std")]
870    #[test]
871    fn compressible_logs_do_not_fall_back_to_raw_fast_path() {
872        let mut data = Vec::with_capacity(16 * 1024);
873        const LINE: &[u8] =
874            b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
875        while data.len() < 16 * 1024 {
876            let remaining = 16 * 1024 - data.len();
877            data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
878        }
879
880        fn assert_not_raw_for_level(data: &[u8], level: super::CompressionLevel) {
881            let compressed = crate::encoding::compress_to_vec(data, level);
882            assert_ne!(first_block_type(&compressed), BlockType::Raw);
883            assert!(
884                compressed.len() < data.len(),
885                "compressible input should remain compressible for level={level:?}"
886            );
887            let mut decoded = Vec::new();
888            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
889            assert_eq!(decoded, data);
890        }
891
892        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Fastest);
893        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Default);
894        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Level(3));
895        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Better);
896        assert_not_raw_for_level(data.as_slice(), super::CompressionLevel::Best);
897    }
898
899    #[cfg(feature = "std")]
900    #[test]
901    fn hinted_small_compressible_frames_use_single_segment_across_levels() {
902        let mut data = Vec::with_capacity(4 * 1024);
903        const LINE: &[u8] =
904            b"ts=2026-04-10T00:00:00Z level=INFO tenant=demo op=flush table=orders\n";
905        while data.len() < 4 * 1024 {
906            let remaining = 4 * 1024 - data.len();
907            data.extend_from_slice(&LINE[..LINE.len().min(remaining)]);
908        }
909
910        for level in [
911            super::CompressionLevel::Fastest,
912            super::CompressionLevel::Default,
913            super::CompressionLevel::Better,
914            super::CompressionLevel::Best,
915            super::CompressionLevel::Level(0),
916            super::CompressionLevel::Level(3),
917            super::CompressionLevel::Level(4),
918            super::CompressionLevel::Level(11),
919        ] {
920            let compressed = {
921                let mut compressor = FrameCompressor::new(level);
922                compressor.set_source_size_hint(data.len() as u64);
923                compressor.set_source(data.as_slice());
924                let mut out = Vec::new();
925                compressor.set_drain(&mut out);
926                compressor.compress();
927                out
928            };
929            let (frame_header, _) = read_frame_header(compressed.as_slice()).unwrap();
930            assert!(
931                frame_header.descriptor.single_segment_flag(),
932                "hinted small compressible frame should use single-segment (level={level:?})"
933            );
934            assert_ne!(
935                first_block_type(&compressed),
936                BlockType::Raw,
937                "compressible hinted frame should stay off raw fast path (level={level:?})"
938            );
939            assert!(
940                compressed.len() < data.len(),
941                "compressible hinted frame should still shrink (level={level:?})"
942            );
943            let mut decoded = Vec::new();
944            zstd::stream::copy_decode(compressed.as_slice(), &mut decoded)
945                .unwrap_or_else(|e| panic!("ffi decode failed (level={level:?}): {e}"));
946            assert_eq!(decoded, data);
947        }
948    }
949
950    struct NoDictionaryMatcher {
951        last_space: Vec<u8>,
952        window_size: u64,
953    }
954
955    impl NoDictionaryMatcher {
956        fn new(window_size: u64) -> Self {
957            Self {
958                last_space: Vec::new(),
959                window_size,
960            }
961        }
962    }
963
964    impl Matcher for NoDictionaryMatcher {
965        fn get_next_space(&mut self) -> Vec<u8> {
966            vec![0; self.window_size as usize]
967        }
968
969        fn get_last_space(&mut self) -> &[u8] {
970            self.last_space.as_slice()
971        }
972
973        fn commit_space(&mut self, space: Vec<u8>) {
974            self.last_space = space;
975        }
976
977        fn skip_matching(&mut self) {}
978
979        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
980            handle_sequence(Sequence::Literals {
981                literals: self.last_space.as_slice(),
982            });
983        }
984
985        fn reset(&mut self, _level: super::CompressionLevel) {
986            self.last_space.clear();
987        }
988
989        fn window_size(&self) -> u64 {
990            self.window_size
991        }
992    }
993
994    #[test]
995    fn frame_starts_with_magic_num() {
996        let mock_data = [1_u8, 2, 3].as_slice();
997        let mut output: Vec<u8> = Vec::new();
998        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
999        compressor.set_source(mock_data);
1000        compressor.set_drain(&mut output);
1001
1002        compressor.compress();
1003        assert!(output.starts_with(&MAGIC_NUM.to_le_bytes()));
1004    }
1005
1006    #[test]
1007    fn very_simple_raw_compress() {
1008        let mock_data = [1_u8, 2, 3].as_slice();
1009        let mut output: Vec<u8> = Vec::new();
1010        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1011        compressor.set_source(mock_data);
1012        compressor.set_drain(&mut output);
1013
1014        compressor.compress();
1015    }
1016
1017    #[test]
1018    fn very_simple_compress() {
1019        let mut mock_data = vec![0; 1 << 17];
1020        mock_data.extend(vec![1; (1 << 17) - 1]);
1021        mock_data.extend(vec![2; (1 << 18) - 1]);
1022        mock_data.extend(vec![2; 1 << 17]);
1023        mock_data.extend(vec![3; (1 << 17) - 1]);
1024        let mut output: Vec<u8> = Vec::new();
1025        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1026        compressor.set_source(mock_data.as_slice());
1027        compressor.set_drain(&mut output);
1028
1029        compressor.compress();
1030
1031        let mut decoder = FrameDecoder::new();
1032        let mut decoded = Vec::with_capacity(mock_data.len());
1033        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1034        assert_eq!(mock_data, decoded);
1035
1036        let mut decoded = Vec::new();
1037        zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
1038        assert_eq!(mock_data, decoded);
1039    }
1040
1041    #[test]
1042    fn rle_compress() {
1043        let mock_data = vec![0; 1 << 19];
1044        let mut output: Vec<u8> = Vec::new();
1045        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1046        compressor.set_source(mock_data.as_slice());
1047        compressor.set_drain(&mut output);
1048
1049        compressor.compress();
1050
1051        let mut decoder = FrameDecoder::new();
1052        let mut decoded = Vec::with_capacity(mock_data.len());
1053        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1054        assert_eq!(mock_data, decoded);
1055    }
1056
1057    #[test]
1058    fn aaa_compress() {
1059        let mock_data = vec![0, 1, 3, 4, 5];
1060        let mut output: Vec<u8> = Vec::new();
1061        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1062        compressor.set_source(mock_data.as_slice());
1063        compressor.set_drain(&mut output);
1064
1065        compressor.compress();
1066
1067        let mut decoder = FrameDecoder::new();
1068        let mut decoded = Vec::with_capacity(mock_data.len());
1069        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1070        assert_eq!(mock_data, decoded);
1071
1072        let mut decoded = Vec::new();
1073        zstd::stream::copy_decode(output.as_slice(), &mut decoded).unwrap();
1074        assert_eq!(mock_data, decoded);
1075    }
1076
1077    #[test]
1078    fn dictionary_compression_sets_required_dict_id_and_roundtrips() {
1079        let dict_raw = include_bytes!("../../dict_tests/dictionary");
1080        let dict_for_encoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
1081        let dict_for_decoder = crate::decoding::Dictionary::decode_dict(dict_raw).unwrap();
1082
1083        let mut data = Vec::new();
1084        for _ in 0..8 {
1085            data.extend_from_slice(&dict_for_decoder.dict_content[..2048]);
1086        }
1087
1088        let mut with_dict = Vec::new();
1089        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1090        let previous = compressor
1091            .set_dictionary_from_bytes(dict_raw)
1092            .expect("dictionary bytes should parse");
1093        assert!(
1094            previous.is_none(),
1095            "first dictionary insert should return None"
1096        );
1097        assert_eq!(
1098            compressor
1099                .set_dictionary(dict_for_encoder)
1100                .expect("valid dictionary should attach")
1101                .expect("set_dictionary_from_bytes inserted previous dictionary")
1102                .id,
1103            dict_for_decoder.id
1104        );
1105        compressor.set_source(data.as_slice());
1106        compressor.set_drain(&mut with_dict);
1107        compressor.compress();
1108
1109        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
1110            .expect("encoded stream should have a frame header");
1111        assert_eq!(frame_header.dictionary_id(), Some(dict_for_decoder.id));
1112
1113        let mut decoder = FrameDecoder::new();
1114        let mut missing_dict_target = Vec::with_capacity(data.len());
1115        let err = decoder
1116            .decode_all_to_vec(&with_dict, &mut missing_dict_target)
1117            .unwrap_err();
1118        assert!(
1119            matches!(
1120                &err,
1121                crate::decoding::errors::FrameDecoderError::DictNotProvided { .. }
1122            ),
1123            "dict-compressed stream should require dictionary id, got: {err:?}"
1124        );
1125
1126        let mut decoder = FrameDecoder::new();
1127        decoder.add_dict(dict_for_decoder).unwrap();
1128        let mut decoded = Vec::with_capacity(data.len());
1129        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
1130        assert_eq!(decoded, data);
1131
1132        let mut ffi_decoder = zstd::bulk::Decompressor::with_dictionary(dict_raw).unwrap();
1133        let mut ffi_decoded = Vec::with_capacity(data.len());
1134        let ffi_written = ffi_decoder
1135            .decompress_to_buffer(with_dict.as_slice(), &mut ffi_decoded)
1136            .unwrap();
1137        assert_eq!(ffi_written, data.len());
1138        assert_eq!(ffi_decoded, data);
1139    }
1140
1141    #[cfg(all(feature = "dict_builder", feature = "std"))]
1142    #[test]
1143    fn dictionary_compression_roundtrips_with_dict_builder_dictionary() {
1144        use std::io::Cursor;
1145
1146        let mut training = Vec::new();
1147        for idx in 0..256u32 {
1148            training.extend_from_slice(
1149                format!("tenant=demo table=orders key={idx} region=eu\n").as_bytes(),
1150            );
1151        }
1152        let mut raw_dict = Vec::new();
1153        crate::dictionary::create_raw_dict_from_source(
1154            Cursor::new(training.as_slice()),
1155            training.len(),
1156            &mut raw_dict,
1157            4096,
1158        )
1159        .expect("dict_builder training should succeed");
1160        assert!(
1161            !raw_dict.is_empty(),
1162            "dict_builder produced an empty dictionary"
1163        );
1164
1165        let dict_id = 0xD1C7_0008;
1166        let encoder_dict =
1167            crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
1168        let decoder_dict =
1169            crate::decoding::Dictionary::from_raw_content(dict_id, raw_dict.clone()).unwrap();
1170
1171        let mut payload = Vec::new();
1172        for idx in 0..96u32 {
1173            payload.extend_from_slice(
1174                format!(
1175                    "tenant=demo table=orders op=put key={idx} value=aaaaabbbbbcccccdddddeeeee\n"
1176                )
1177                .as_bytes(),
1178            );
1179        }
1180
1181        let mut without_dict = Vec::new();
1182        let mut baseline = FrameCompressor::new(super::CompressionLevel::Fastest);
1183        baseline.set_source(payload.as_slice());
1184        baseline.set_drain(&mut without_dict);
1185        baseline.compress();
1186
1187        let mut with_dict = Vec::new();
1188        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1189        compressor
1190            .set_dictionary(encoder_dict)
1191            .expect("valid dict_builder dictionary should attach");
1192        compressor.set_source(payload.as_slice());
1193        compressor.set_drain(&mut with_dict);
1194        compressor.compress();
1195
1196        let (frame_header, _) = crate::decoding::frame::read_frame_header(with_dict.as_slice())
1197            .expect("encoded stream should have a frame header");
1198        assert_eq!(frame_header.dictionary_id(), Some(dict_id));
1199        let mut decoder = FrameDecoder::new();
1200        decoder.add_dict(decoder_dict).unwrap();
1201        let mut decoded = Vec::with_capacity(payload.len());
1202        decoder.decode_all_to_vec(&with_dict, &mut decoded).unwrap();
1203        assert_eq!(decoded, payload);
1204        assert!(
1205            with_dict.len() < without_dict.len(),
1206            "trained dictionary should improve compression for this small payload"
1207        );
1208    }
1209
1210    #[test]
1211    fn set_dictionary_from_bytes_seeds_entropy_tables_for_first_block() {
1212        let dict_raw = include_bytes!("../../dict_tests/dictionary");
1213        let mut output = Vec::new();
1214        let input = b"";
1215
1216        let mut compressor = FrameCompressor::new(super::CompressionLevel::Fastest);
1217        let previous = compressor
1218            .set_dictionary_from_bytes(dict_raw)
1219            .expect("dictionary bytes should parse");
1220        assert!(previous.is_none());
1221
1222        compressor.set_source(input.as_slice());
1223        compressor.set_drain(&mut output);
1224        compressor.compress();
1225
1226        assert!(
1227            compressor.state.last_huff_table.is_some(),
1228            "dictionary entropy should seed previous huffman table before first block"
1229        );
1230        assert!(
1231            compressor.state.fse_tables.ll_previous.is_some(),
1232            "dictionary entropy should seed previous ll table before first block"
1233        );
1234        assert!(
1235            compressor.state.fse_tables.ml_previous.is_some(),
1236            "dictionary entropy should seed previous ml table before first block"
1237        );
1238        assert!(
1239            compressor.state.fse_tables.of_previous.is_some(),
1240            "dictionary entropy should seed previous of table before first block"
1241        );
1242    }
1243
1244    #[test]
1245    fn set_dictionary_rejects_zero_dictionary_id() {
1246        let invalid = crate::decoding::Dictionary {
1247            id: 0,
1248            fse: crate::decoding::scratch::FSEScratch::new(),
1249            huf: crate::decoding::scratch::HuffmanScratch::new(),
1250            dict_content: vec![1, 2, 3],
1251            offset_hist: [1, 4, 8],
1252        };
1253
1254        let mut compressor: FrameCompressor<
1255            &[u8],
1256            Vec<u8>,
1257            crate::encoding::match_generator::MatchGeneratorDriver,
1258        > = FrameCompressor::new(super::CompressionLevel::Fastest);
1259        let result = compressor.set_dictionary(invalid);
1260        assert!(matches!(
1261            result,
1262            Err(crate::decoding::errors::DictionaryDecodeError::ZeroDictionaryId)
1263        ));
1264    }
1265
1266    #[test]
1267    fn set_dictionary_rejects_zero_repeat_offsets() {
1268        let invalid = crate::decoding::Dictionary {
1269            id: 1,
1270            fse: crate::decoding::scratch::FSEScratch::new(),
1271            huf: crate::decoding::scratch::HuffmanScratch::new(),
1272            dict_content: vec![1, 2, 3],
1273            offset_hist: [0, 4, 8],
1274        };
1275
1276        let mut compressor: FrameCompressor<
1277            &[u8],
1278            Vec<u8>,
1279            crate::encoding::match_generator::MatchGeneratorDriver,
1280        > = FrameCompressor::new(super::CompressionLevel::Fastest);
1281        let result = compressor.set_dictionary(invalid);
1282        assert!(matches!(
1283            result,
1284            Err(
1285                crate::decoding::errors::DictionaryDecodeError::ZeroRepeatOffsetInDictionary {
1286                    index: 0
1287                }
1288            )
1289        ));
1290    }
1291
1292    #[test]
1293    fn uncompressed_mode_does_not_require_dictionary() {
1294        let dict_id = 0xABCD_0001;
1295        let dict =
1296            crate::decoding::Dictionary::from_raw_content(dict_id, b"shared-history".to_vec())
1297                .expect("raw dictionary should be valid");
1298
1299        let payload = b"plain-bytes-that-should-stay-raw";
1300        let mut output = Vec::new();
1301        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1302        compressor
1303            .set_dictionary(dict)
1304            .expect("dictionary should attach in uncompressed mode");
1305        compressor.set_source(payload.as_slice());
1306        compressor.set_drain(&mut output);
1307        compressor.compress();
1308
1309        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1310            .expect("encoded frame should have a header");
1311        assert_eq!(
1312            frame_header.dictionary_id(),
1313            None,
1314            "raw/uncompressed frames must not advertise dictionary dependency"
1315        );
1316
1317        let mut decoder = FrameDecoder::new();
1318        let mut decoded = Vec::with_capacity(payload.len());
1319        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1320        assert_eq!(decoded, payload);
1321    }
1322
1323    #[test]
1324    fn dictionary_roundtrip_stays_valid_after_output_exceeds_window() {
1325        use crate::encoding::match_generator::MatchGeneratorDriver;
1326
1327        let dict_id = 0xABCD_0002;
1328        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1329            .expect("raw dictionary should be valid");
1330        let dict_for_decoder =
1331            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1332                .expect("raw dictionary should be valid");
1333
1334        // Payload must exceed the encoder's advertised window (128 KiB for
1335        // Fastest) so the test actually exercises cross-window-boundary behavior.
1336        let payload = b"abcdefgh".repeat(128 * 1024 / 8 + 64);
1337        let matcher = MatchGeneratorDriver::new(1024, 1);
1338
1339        let mut no_dict_output = Vec::new();
1340        let mut no_dict_compressor =
1341            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1342        no_dict_compressor.set_source(payload.as_slice());
1343        no_dict_compressor.set_drain(&mut no_dict_output);
1344        no_dict_compressor.compress();
1345        let (no_dict_frame_header, _) =
1346            crate::decoding::frame::read_frame_header(no_dict_output.as_slice())
1347                .expect("baseline frame should have a header");
1348        let no_dict_window = no_dict_frame_header
1349            .window_size()
1350            .expect("window size should be present");
1351
1352        let mut output = Vec::new();
1353        let matcher = MatchGeneratorDriver::new(1024, 1);
1354        let mut compressor =
1355            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1356        compressor
1357            .set_dictionary(dict)
1358            .expect("dictionary should attach");
1359        compressor.set_source(payload.as_slice());
1360        compressor.set_drain(&mut output);
1361        compressor.compress();
1362
1363        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1364            .expect("encoded frame should have a header");
1365        let advertised_window = frame_header
1366            .window_size()
1367            .expect("window size should be present");
1368        assert_eq!(
1369            advertised_window, no_dict_window,
1370            "dictionary priming must not inflate advertised window size"
1371        );
1372        assert!(
1373            payload.len() > advertised_window as usize,
1374            "test must cross the advertised window boundary"
1375        );
1376
1377        let mut decoder = FrameDecoder::new();
1378        decoder.add_dict(dict_for_decoder).unwrap();
1379        let mut decoded = Vec::with_capacity(payload.len());
1380        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1381        assert_eq!(decoded, payload);
1382    }
1383
1384    #[test]
1385    fn source_size_hint_with_dictionary_keeps_roundtrip_and_nonincreasing_window() {
1386        let dict_id = 0xABCD_0004;
1387        let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
1388        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
1389        let dict_for_decoder =
1390            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
1391        let payload = b"abcdabcdabcdabcd".repeat(128);
1392
1393        let mut hinted_output = Vec::new();
1394        let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
1395        hinted.set_dictionary(dict).unwrap();
1396        hinted.set_source_size_hint(1);
1397        hinted.set_source(payload.as_slice());
1398        hinted.set_drain(&mut hinted_output);
1399        hinted.compress();
1400
1401        let mut no_hint_output = Vec::new();
1402        let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
1403        no_hint
1404            .set_dictionary(
1405                crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
1406                    .unwrap(),
1407            )
1408            .unwrap();
1409        no_hint.set_source(payload.as_slice());
1410        no_hint.set_drain(&mut no_hint_output);
1411        no_hint.compress();
1412
1413        let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
1414            .expect("encoded frame should have a header")
1415            .0
1416            .window_size()
1417            .expect("window size should be present");
1418        let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
1419            .expect("encoded frame should have a header")
1420            .0
1421            .window_size()
1422            .expect("window size should be present");
1423        assert!(
1424            hinted_window <= no_hint_window,
1425            "source-size hint should not increase advertised window with dictionary priming",
1426        );
1427
1428        let mut decoder = FrameDecoder::new();
1429        decoder.add_dict(dict_for_decoder).unwrap();
1430        let mut decoded = Vec::with_capacity(payload.len());
1431        decoder
1432            .decode_all_to_vec(&hinted_output, &mut decoded)
1433            .unwrap();
1434        assert_eq!(decoded, payload);
1435    }
1436
1437    #[test]
1438    fn source_size_hint_with_dictionary_keeps_roundtrip_for_larger_payload() {
1439        let dict_id = 0xABCD_0005;
1440        let dict_content = b"abcd".repeat(1024); // 4 KiB dictionary history
1441        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, dict_content).unwrap();
1442        let dict_for_decoder =
1443            crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024)).unwrap();
1444        let payload = b"abcd".repeat(1024); // 4 KiB payload
1445        let payload_len = payload.len() as u64;
1446
1447        let mut hinted_output = Vec::new();
1448        let mut hinted = FrameCompressor::new(super::CompressionLevel::Fastest);
1449        hinted.set_dictionary(dict).unwrap();
1450        hinted.set_source_size_hint(payload_len);
1451        hinted.set_source(payload.as_slice());
1452        hinted.set_drain(&mut hinted_output);
1453        hinted.compress();
1454
1455        let mut no_hint_output = Vec::new();
1456        let mut no_hint = FrameCompressor::new(super::CompressionLevel::Fastest);
1457        no_hint
1458            .set_dictionary(
1459                crate::decoding::Dictionary::from_raw_content(dict_id, b"abcd".repeat(1024))
1460                    .unwrap(),
1461            )
1462            .unwrap();
1463        no_hint.set_source(payload.as_slice());
1464        no_hint.set_drain(&mut no_hint_output);
1465        no_hint.compress();
1466
1467        let hinted_window = crate::decoding::frame::read_frame_header(hinted_output.as_slice())
1468            .expect("encoded frame should have a header")
1469            .0
1470            .window_size()
1471            .expect("window size should be present");
1472        let no_hint_window = crate::decoding::frame::read_frame_header(no_hint_output.as_slice())
1473            .expect("encoded frame should have a header")
1474            .0
1475            .window_size()
1476            .expect("window size should be present");
1477        assert!(
1478            hinted_window <= no_hint_window,
1479            "source-size hint should not increase advertised window with dictionary priming",
1480        );
1481
1482        let mut decoder = FrameDecoder::new();
1483        decoder.add_dict(dict_for_decoder).unwrap();
1484        let mut decoded = Vec::with_capacity(payload.len());
1485        decoder
1486            .decode_all_to_vec(&hinted_output, &mut decoded)
1487            .unwrap();
1488        assert_eq!(decoded, payload);
1489    }
1490
1491    #[test]
1492    fn custom_matcher_without_dictionary_priming_does_not_advertise_dict_id() {
1493        let dict_id = 0xABCD_0003;
1494        let dict = crate::decoding::Dictionary::from_raw_content(dict_id, b"abcdefgh".to_vec())
1495            .expect("raw dictionary should be valid");
1496        let payload = b"abcdefghabcdefgh";
1497
1498        let mut output = Vec::new();
1499        let matcher = NoDictionaryMatcher::new(64);
1500        let mut compressor =
1501            FrameCompressor::new_with_matcher(matcher, super::CompressionLevel::Fastest);
1502        compressor
1503            .set_dictionary(dict)
1504            .expect("dictionary should attach");
1505        compressor.set_source(payload.as_slice());
1506        compressor.set_drain(&mut output);
1507        compressor.compress();
1508
1509        let (frame_header, _) = crate::decoding::frame::read_frame_header(output.as_slice())
1510            .expect("encoded frame should have a header");
1511        assert_eq!(
1512            frame_header.dictionary_id(),
1513            None,
1514            "matchers that do not support dictionary priming must not advertise dictionary dependency"
1515        );
1516
1517        let mut decoder = FrameDecoder::new();
1518        let mut decoded = Vec::with_capacity(payload.len());
1519        decoder.decode_all_to_vec(&output, &mut decoded).unwrap();
1520        assert_eq!(decoded, payload);
1521    }
1522
1523    #[cfg(feature = "hash")]
1524    #[test]
1525    fn checksum_two_frames_reused_compressor() {
1526        // Compress the same data twice using the same compressor and verify that:
1527        // 1. The checksum written in each frame matches what the decoder calculates.
1528        // 2. The hasher is correctly reset between frames (no cross-contamination).
1529        //    If the hasher were NOT reset, the second frame's calculated checksum
1530        //    would differ from the one stored in the frame data, causing assert_eq to fail.
1531        let data: Vec<u8> = (0u8..=255).cycle().take(1024).collect();
1532
1533        let mut compressor = FrameCompressor::new(super::CompressionLevel::Uncompressed);
1534
1535        // --- Frame 1 ---
1536        let mut compressed1 = Vec::new();
1537        compressor.set_source(data.as_slice());
1538        compressor.set_drain(&mut compressed1);
1539        compressor.compress();
1540
1541        // --- Frame 2 (reuse the same compressor) ---
1542        let mut compressed2 = Vec::new();
1543        compressor.set_source(data.as_slice());
1544        compressor.set_drain(&mut compressed2);
1545        compressor.compress();
1546
1547        fn decode_and_collect(compressed: &[u8]) -> (Vec<u8>, Option<u32>, Option<u32>) {
1548            let mut decoder = FrameDecoder::new();
1549            let mut source = compressed;
1550            decoder.reset(&mut source).unwrap();
1551            while !decoder.is_finished() {
1552                decoder
1553                    .decode_blocks(&mut source, crate::decoding::BlockDecodingStrategy::All)
1554                    .unwrap();
1555            }
1556            let mut decoded = Vec::new();
1557            decoder.collect_to_writer(&mut decoded).unwrap();
1558            (
1559                decoded,
1560                decoder.get_checksum_from_data(),
1561                decoder.get_calculated_checksum(),
1562            )
1563        }
1564
1565        let (decoded1, chksum_from_data1, chksum_calculated1) = decode_and_collect(&compressed1);
1566        assert_eq!(decoded1, data, "frame 1: decoded data mismatch");
1567        assert_eq!(
1568            chksum_from_data1, chksum_calculated1,
1569            "frame 1: checksum mismatch"
1570        );
1571
1572        let (decoded2, chksum_from_data2, chksum_calculated2) = decode_and_collect(&compressed2);
1573        assert_eq!(decoded2, data, "frame 2: decoded data mismatch");
1574        assert_eq!(
1575            chksum_from_data2, chksum_calculated2,
1576            "frame 2: checksum mismatch"
1577        );
1578
1579        // Same data compressed twice must produce the same checksum.
1580        // If state leaked across frames, the second calculated checksum would differ.
1581        assert_eq!(
1582            chksum_from_data1, chksum_from_data2,
1583            "frame 1 and frame 2 should have the same checksum (same data, hash must reset per frame)"
1584        );
1585    }
1586
1587    #[cfg(feature = "std")]
1588    #[test]
1589    fn fuzz_targets() {
1590        use std::io::Read;
1591        fn decode_szstd(data: &mut dyn std::io::Read) -> Vec<u8> {
1592            let mut decoder = crate::decoding::StreamingDecoder::new(data).unwrap();
1593            let mut result: Vec<u8> = Vec::new();
1594            decoder.read_to_end(&mut result).expect("Decoding failed");
1595            result
1596        }
1597
1598        fn decode_szstd_writer(mut data: impl Read) -> Vec<u8> {
1599            let mut decoder = crate::decoding::FrameDecoder::new();
1600            decoder.reset(&mut data).unwrap();
1601            let mut result = vec![];
1602            while !decoder.is_finished() || decoder.can_collect() > 0 {
1603                decoder
1604                    .decode_blocks(
1605                        &mut data,
1606                        crate::decoding::BlockDecodingStrategy::UptoBytes(1024 * 1024),
1607                    )
1608                    .unwrap();
1609                decoder.collect_to_writer(&mut result).unwrap();
1610            }
1611            result
1612        }
1613
1614        fn encode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
1615            zstd::stream::encode_all(std::io::Cursor::new(data), 3)
1616        }
1617
1618        fn encode_szstd_uncompressed(data: &mut dyn std::io::Read) -> Vec<u8> {
1619            let mut input = Vec::new();
1620            data.read_to_end(&mut input).unwrap();
1621
1622            crate::encoding::compress_to_vec(
1623                input.as_slice(),
1624                crate::encoding::CompressionLevel::Uncompressed,
1625            )
1626        }
1627
1628        fn encode_szstd_compressed(data: &mut dyn std::io::Read) -> Vec<u8> {
1629            let mut input = Vec::new();
1630            data.read_to_end(&mut input).unwrap();
1631
1632            crate::encoding::compress_to_vec(
1633                input.as_slice(),
1634                crate::encoding::CompressionLevel::Fastest,
1635            )
1636        }
1637
1638        fn decode_zstd(data: &[u8]) -> Result<Vec<u8>, std::io::Error> {
1639            let mut output = Vec::new();
1640            zstd::stream::copy_decode(data, &mut output)?;
1641            Ok(output)
1642        }
1643        if std::fs::exists("fuzz/artifacts/interop").unwrap_or(false) {
1644            for file in std::fs::read_dir("fuzz/artifacts/interop").unwrap() {
1645                if file.as_ref().unwrap().file_type().unwrap().is_file() {
1646                    let data = std::fs::read(file.unwrap().path()).unwrap();
1647                    let data = data.as_slice();
1648                    // Decoding
1649                    let compressed = encode_zstd(data).unwrap();
1650                    let decoded = decode_szstd(&mut compressed.as_slice());
1651                    let decoded2 = decode_szstd_writer(&mut compressed.as_slice());
1652                    assert!(
1653                        decoded == data,
1654                        "Decoded data did not match the original input during decompression"
1655                    );
1656                    assert_eq!(
1657                        decoded2, data,
1658                        "Decoded data did not match the original input during decompression"
1659                    );
1660
1661                    // Encoding
1662                    // Uncompressed encoding
1663                    let mut input = data;
1664                    let compressed = encode_szstd_uncompressed(&mut input);
1665                    let decoded = decode_zstd(&compressed).unwrap();
1666                    assert_eq!(
1667                        decoded, data,
1668                        "Decoded data did not match the original input during compression"
1669                    );
1670                    // Compressed encoding
1671                    let mut input = data;
1672                    let compressed = encode_szstd_compressed(&mut input);
1673                    let decoded = decode_zstd(&compressed).unwrap();
1674                    assert_eq!(
1675                        decoded, data,
1676                        "Decoded data did not match the original input during compression"
1677                    );
1678                }
1679            }
1680        }
1681    }
1682}