Skip to main content

structured_zstd/encoding/
streaming_encoder.rs

1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14    CompressionLevel, EncoderDictionary, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15    frame_compressor::CachedDictionaryEntropy, frame_compressor::CompressState,
16    frame_compressor::FseTables, frame_compressor::PreviousFseTable, frame_header::FrameHeader,
17};
18use crate::io::{Error, ErrorKind, Write};
19
20/// Incremental frame encoder that implements [`Write`].
21///
22/// Data can be provided with multiple `write()` calls. Full blocks are compressed
23/// automatically, `flush()` emits the currently buffered partial block as non-last,
24/// and `finish()` closes the frame and returns the wrapped writer.
25pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
26    drain: Option<W>,
27    compression_level: CompressionLevel,
28    state: CompressState<M>,
29    pending: Vec<u8>,
30    encoded_scratch: Vec<u8>,
31    errored: bool,
32    last_error_kind: Option<ErrorKind>,
33    last_error_message: Option<String>,
34    frame_started: bool,
35    /// Upper bound on emitted block sizes (upstream `ZSTD_c_targetCBlockSize`
36    /// semantics; see `FrameCompressor::set_target_block_size`). `None` =
37    /// the format's 128 KiB ceiling.
38    target_block_size: Option<u32>,
39    pledged_content_size: Option<u64>,
40    /// Advisory source-size hint from [`set_source_size_hint`](Self::set_source_size_hint).
41    /// Unlike `pledged_content_size` it carries no end-of-frame enforcement, but
42    /// it still feeds the small-input gates (matcher sizing AND the Fast HUF
43    /// fast-path gate) so `set_source_size_hint(small)` reduces work the same way
44    /// a pledge does. The HUF gate reads `pledged_content_size.or(source_size_hint)`.
45    source_size_hint: Option<u64>,
46    /// Whether a pledged size is written into the header's
47    /// `Frame_Content_Size` field (upstream `ZSTD_c_contentSizeFlag`).
48    /// Pledge *enforcement* is independent of this flag — upstream
49    /// validates consumed bytes against the pledge at frame end even
50    /// when the header omits the field. Default `true`.
51    content_size_flag: bool,
52    bytes_consumed: u64,
53    /// Effective strategy tag when a public-parameter
54    /// [`Strategy`](crate::encoding::Strategy) override (#27) is active, mirroring
55    /// [`FrameCompressor`](crate::encoding::FrameCompressor)'s field. `Some`
56    /// survives frame start so the literal-compression gates run the same
57    /// strategy the matcher does; `None` keeps the level-derived tag.
58    strategy_override: Option<crate::encoding::strategy::StrategyTag>,
59    /// `ZSTD_f_zstd1_magicless` — omit the 4-byte magic number prefix.
60    /// Default false. See [`Self::set_magicless`].
61    magicless: bool,
62    /// Whether to emit a trailing XXH64 content checksum and set the frame
63    /// header's `Content_Checksum_flag` (upstream `ZSTD_c_checksumFlag`).
64    /// Default `false`, matching the upstream library default; combined with
65    /// the `hash` feature, so without `hash` no checksum is emitted
66    /// regardless. See [`Self::set_content_checksum`].
67    content_checksum: bool,
68    /// Dictionary applied to the frame (upstream zstd `ZSTD_CCtx_loadDictionary` on a
69    /// streaming context). `None` = no dictionary. Set before the first write.
70    dictionary: Option<EncoderDictionary>,
71    /// Whether the frame header records the attached dictionary's ID
72    /// (upstream `ZSTD_c_dictIDFlag`). Default `true`. Raw-content
73    /// dictionaries (upstream `ZSTD_CCtx_refPrefix`) carry a synthetic
74    /// non-zero ID that must not reach the wire, so their attach path
75    /// turns this off. See [`Self::set_dictionary_id_flag`].
76    dictionary_id_flag: bool,
77    /// Encoder entropy tables (literals Huffman + LL/ML/OF FSE "previous"
78    /// tables) the dictionary seeds into the first block, derived once when the
79    /// dictionary is attached so each frame start is a cheap clone.
80    dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
81    #[cfg(feature = "hash")]
82    hasher: XxHash64,
83}
84
85impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
86    /// Creates a streaming encoder backed by the default match generator.
87    ///
88    /// The encoder writes compressed bytes into `drain` and applies `compression_level`
89    /// to all subsequently written blocks.
90    pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
91        Self::new_with_matcher(
92            MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
93            drain,
94            compression_level,
95        )
96    }
97
98    /// Configure fine-grained compression parameters (#27): resets the level to
99    /// the parameters' level and installs the per-knob overrides (window / hash
100    /// / chain / search logs, strategy, long-distance matching) applied at the
101    /// next frame. Mirrors [`FrameCompressor::set_parameters`]. Must be called
102    /// before the first [`write`](Write::write). Only the built-in
103    /// `MatchGeneratorDriver` exposes the override knobs, so this lives on the
104    /// default-matcher impl.
105    pub fn set_parameters(
106        &mut self,
107        params: &crate::encoding::CompressionParameters,
108    ) -> Result<(), Error> {
109        self.ensure_open()?;
110        if self.frame_started {
111            return Err(invalid_input_error(
112                "compression parameters must be set before the first write",
113            ));
114        }
115        self.compression_level = params.level();
116        let overrides = params.overrides();
117        // Persist the strategy override so `ensure_frame_started`'s level-based
118        // resync does not discard it (matching `FrameCompressor::set_parameters`).
119        self.strategy_override = overrides.strategy.map(|s| s.tag());
120        self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
121            crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
122        });
123        self.state.huf_optimal_search = crate::encoding::frame_compressor::fast_huf_search_enabled(
124            self.state.strategy_tag,
125            self.pledged_content_size.or(self.source_size_hint),
126        );
127        self.state.matcher.set_param_overrides(Some(overrides));
128        Ok(())
129    }
130}
131
132impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
133    /// Creates a streaming encoder with an explicitly provided matcher implementation.
134    ///
135    /// This constructor is primarily intended for tests and advanced callers that need
136    /// custom match-window behavior.
137    pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
138        Self {
139            drain: Some(drain),
140            compression_level,
141            state: CompressState {
142                matcher,
143                last_huff_table: None,
144                huff_table_spare: None,
145                fse_tables: FseTables::new(),
146                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
147                offset_hist: [1, 4, 8],
148                strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
149                    compression_level,
150                ),
151                huf_optimal_search: true,
152            },
153            pending: Vec::new(),
154            encoded_scratch: Vec::new(),
155            errored: false,
156            last_error_kind: None,
157            last_error_message: None,
158            frame_started: false,
159            target_block_size: None,
160            pledged_content_size: None,
161            source_size_hint: None,
162            content_size_flag: true,
163            bytes_consumed: 0,
164            strategy_override: None,
165            magicless: false,
166            content_checksum: false,
167            dictionary: None,
168            dictionary_id_flag: true,
169            dictionary_entropy_cache: None,
170            #[cfg(feature = "hash")]
171            hasher: XxHash64::with_seed(0),
172        }
173    }
174
175    /// Set an upper bound on each physical block's payload (semantics of
176    /// upstream `ZSTD_c_targetCBlockSize`): every block carries at most
177    /// `target` payload bytes, +3-byte block header on the wire — the
178    /// upstream knob is likewise a convergence target for block sizing,
179    /// not a cap on header-inclusive wire bytes. Clamped to
180    /// `[MIN_TARGET_BLOCK_SIZE, MAX_BLOCK_SIZE]`; mirrors
181    /// `FrameCompressor::set_target_block_size`. Must be set before the
182    /// first write.
183    pub fn set_target_block_size(&mut self, target: Option<u32>) -> Result<(), Error> {
184        self.ensure_open()?;
185        if self.frame_started {
186            return Err(invalid_input_error(
187                "the block-size target must be set before the first write",
188            ));
189        }
190        self.target_block_size = target.map(|t| {
191            t.clamp(
192                crate::common::MIN_TARGET_BLOCK_SIZE,
193                crate::common::MAX_BLOCK_SIZE,
194            )
195        });
196        Ok(())
197    }
198
199    /// Enable or disable the trailing XXH64 content checksum
200    /// (upstream `ZSTD_c_checksumFlag`). Default `false`, matching the
201    /// upstream library default (`ZSTD_c_checksumFlag = 0`). Must be called
202    /// before the first [`write`](Write::write); once the frame header is
203    /// emitted the flag is fixed, so a late change returns an error rather
204    /// than producing a header/trailer mismatch. Without the `hash` feature
205    /// no checksum is emitted regardless.
206    pub fn set_content_checksum(&mut self, emit: bool) -> Result<(), Error> {
207        self.ensure_open()?;
208        if self.frame_started {
209            return Err(invalid_input_error(
210                "content checksum must be set before the first write",
211            ));
212        }
213        self.content_checksum = emit;
214        Ok(())
215    }
216
217    /// Enable or disable magicless frame format (`ZSTD_f_zstd1_magicless`).
218    ///
219    /// When set to `true`, the frame header serialized by this encoder
220    /// omits the 4-byte magic number prefix. Must be called BEFORE the
221    /// first [`write`](Write::write) call; calling it after the frame
222    /// header has already been emitted returns an error so the caller
223    /// can't be misled into thinking they produced a magicless stream.
224    pub fn set_magicless(&mut self, magicless: bool) -> Result<(), Error> {
225        self.ensure_open()?;
226        if self.frame_started {
227            return Err(invalid_input_error(
228                "magicless format must be set before the first write",
229            ));
230        }
231        self.magicless = magicless;
232        Ok(())
233    }
234
235    /// Pledge the total uncompressed content size for this frame.
236    ///
237    /// When set, the frame header will include a `Frame_Content_Size` field.
238    /// This enables decoders to pre-allocate output buffers.
239    /// The pledged size is also forwarded as a source-size hint to the
240    /// matcher so small inputs can use smaller matching tables.
241    ///
242    /// Must be called **before** the first [`write`](Write::write) call;
243    /// calling it after the frame header has already been emitted returns an
244    /// error.
245    pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
246        self.ensure_open()?;
247        if self.frame_started {
248            return Err(invalid_input_error(
249                "pledged content size must be set before the first write",
250            ));
251        }
252        self.pledged_content_size = Some(size);
253        // Also use pledged size as source-size hint so the matcher
254        // can select smaller tables for small inputs.
255        self.state.matcher.set_source_size_hint(size);
256        Ok(())
257    }
258
259    /// Control whether the pledged size is written into the header's
260    /// `Frame_Content_Size` field (upstream `ZSTD_c_contentSizeFlag`,
261    /// default on). With the flag off the header omits the field, but a
262    /// pledge set via [`set_pledged_content_size`](Self::set_pledged_content_size)
263    /// is still enforced against the bytes actually written. Must be
264    /// called before the first [`write`](Write::write).
265    pub fn set_content_size_flag(&mut self, emit: bool) -> Result<(), Error> {
266        self.ensure_open()?;
267        if self.frame_started {
268            return Err(invalid_input_error(
269                "content size flag must be set before the first write",
270            ));
271        }
272        self.content_size_flag = emit;
273        Ok(())
274    }
275
276    /// Provide a hint about the total uncompressed size for the next frame.
277    ///
278    /// Unlike [`set_pledged_content_size`](Self::set_pledged_content_size),
279    /// this does **not** enforce that exactly `size` bytes are written; it
280    /// may reduce matcher tables, advertised frame window, and block sizing
281    /// for small inputs. Must be called before the first
282    /// [`write`](Write::write).
283    pub fn set_source_size_hint(&mut self, size: u64) -> Result<(), Error> {
284        self.ensure_open()?;
285        if self.frame_started {
286            return Err(invalid_input_error(
287                "source size hint must be set before the first write",
288            ));
289        }
290        self.state.matcher.set_source_size_hint(size);
291        // Feed the same hint to the Fast HUF fast-path gate (resolved in
292        // `set_parameters` / `ensure_frame_started` via
293        // `pledged_content_size.or(source_size_hint)`), so a small advisory size
294        // also lifts Fast streams off the expensive optimal-HUF search.
295        self.source_size_hint = Some(size);
296        Ok(())
297    }
298
299    /// Attach a serialized dictionary blob to the frame (upstream zstd
300    /// `ZSTD_CCtx_loadDictionary` on a streaming context). The dictionary primes
301    /// the match-finder and seeds the first block's entropy tables + repeat
302    /// offsets, and its ID is written into the frame header. Must be called
303    /// before the first [`write`](Write::write); the parsed dictionary must have
304    /// a non-zero ID and non-zero repeat offsets.
305    pub fn set_dictionary_from_bytes(&mut self, raw_dictionary: &[u8]) -> Result<(), Error> {
306        let dict = EncoderDictionary::from_bytes(raw_dictionary)
307            .map_err(|err| invalid_input_error(&alloc::format!("invalid dictionary: {err:?}")))?;
308        self.set_encoder_dictionary(dict)
309    }
310
311    /// Whether the frame header records the dictionary ID when a dictionary
312    /// is attached (upstream `ZSTD_c_dictIDFlag` semantics; default `true`).
313    /// Mirrors [`FrameCompressor::set_dictionary_id_flag`]. Decoders can still
314    /// decode such frames by supplying the dictionary explicitly.
315    pub fn set_dictionary_id_flag(&mut self, emit: bool) -> Result<(), Error> {
316        self.ensure_open()?;
317        if self.frame_started {
318            return Err(invalid_input_error(
319                "dictionary ID flag must be set before the first write",
320            ));
321        }
322        self.dictionary_id_flag = emit;
323        Ok(())
324    }
325
326    /// Attach an already-parsed [`EncoderDictionary`] to the frame. See
327    /// [`set_dictionary_from_bytes`](Self::set_dictionary_from_bytes); must be
328    /// called before the first write.
329    pub fn set_encoder_dictionary(&mut self, dict: EncoderDictionary) -> Result<(), Error> {
330        self.ensure_open()?;
331        if self.frame_started {
332            return Err(invalid_input_error(
333                "dictionary must be attached before the first write",
334            ));
335        }
336        let inner = &dict.inner;
337        if inner.id == 0 {
338            return Err(invalid_input_error("dictionary has a zero ID"));
339        }
340        if inner.offset_hist.contains(&0) {
341            return Err(invalid_input_error(
342                "dictionary carries a zero repeat offset",
343            ));
344        }
345        self.dictionary_entropy_cache = Some(CachedDictionaryEntropy::from_dictionary(inner));
346        self.dictionary = Some(dict);
347        Ok(())
348    }
349
350    /// Returns an immutable reference to the wrapped output drain.
351    ///
352    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
353    /// consumes the encoder and returns ownership of the drain.
354    pub fn get_ref(&self) -> &W {
355        self.drain
356            .as_ref()
357            .expect("streaming encoder drain is present until finish consumes self")
358    }
359
360    /// Total heap bytes this encoder's allocations hold, excluding the
361    /// inline struct and the drain `W` (whose footprint the owner can
362    /// measure through [`get_ref`](Self::get_ref)): match-finder tables /
363    /// history / recycled buffers, retained Huffman tables, the staging
364    /// `pending` / `encoded_scratch` buffers, the retained dictionary
365    /// content, and the cached dictionary entropy tables. Mirrors
366    /// `FrameCompressor::heap_size` so a context can report its true
367    /// footprint through `ZSTD_sizeof_CCtx`.
368    pub fn heap_size(&self) -> usize {
369        let mut total = self.state.matcher.heap_size();
370        total += self
371            .state
372            .last_huff_table
373            .as_ref()
374            .map_or(0, |table| table.heap_size());
375        total += self
376            .state
377            .huff_table_spare
378            .as_ref()
379            .map_or(0, |table| table.heap_size());
380        total += self.pending.capacity();
381        total += self.encoded_scratch.capacity();
382        total += self
383            .dictionary
384            .as_ref()
385            .map_or(0, |d| d.inner.dict_content.capacity());
386        total += self
387            .dictionary_entropy_cache
388            .as_ref()
389            .map_or(0, CachedDictionaryEntropy::heap_size);
390        total
391    }
392
393    /// Returns a mutable reference to the wrapped output drain.
394    ///
395    /// It is inadvisable to directly write to the underlying writer, as doing
396    /// so would corrupt the zstd frame being assembled by the encoder.
397    ///
398    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
399    /// consumes the encoder and returns ownership of the drain.
400    pub fn get_mut(&mut self) -> &mut W {
401        self.drain
402            .as_mut()
403            .expect("streaming encoder drain is present until finish consumes self")
404    }
405
406    /// Finalizes the current zstd frame and returns the wrapped output drain.
407    ///
408    /// If no payload was written yet, this still emits a valid empty frame.
409    /// Calling this method consumes the encoder.
410    pub fn finish(mut self) -> Result<W, Error> {
411        self.ensure_open()?;
412
413        // Validate the pledge before finalizing the frame. If finish() is
414        // called before any writes, this also avoids emitting a header with
415        // an incorrect FCS into the drain on mismatch.
416        if let Some(pledged) = self.pledged_content_size
417            && self.bytes_consumed != pledged
418        {
419            return Err(invalid_input_error(
420                "pledged content size does not match bytes consumed",
421            ));
422        }
423
424        self.ensure_frame_started()?;
425
426        if self.pending.is_empty() {
427            self.write_empty_last_block()
428                .map_err(|err| self.fail(err))?;
429        } else {
430            self.emit_pending_block(true)?;
431        }
432
433        let mut drain = self
434            .drain
435            .take()
436            .expect("streaming encoder drain must be present when finishing");
437
438        #[cfg(feature = "hash")]
439        if self.content_checksum {
440            let checksum = self.hasher.finish() as u32;
441            drain
442                .write_all(&checksum.to_le_bytes())
443                .map_err(|err| self.fail(err))?;
444        }
445
446        drain.flush().map_err(|err| self.fail(err))?;
447        Ok(drain)
448    }
449
450    fn ensure_open(&self) -> Result<(), Error> {
451        if self.errored {
452            return Err(self.sticky_error());
453        }
454        Ok(())
455    }
456
457    // Cold path (only reached after poisoning). The format!() calls still allocate
458    // in no_std even though error_with_kind_message/other_error_owned drop the
459    // message; this is acceptable on an error recovery path to keep match arms simple.
460    fn sticky_error(&self) -> Error {
461        match (self.last_error_kind, self.last_error_message.as_deref()) {
462            (Some(kind), Some(message)) => error_with_kind_message(
463                kind,
464                format!(
465                    "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
466                ),
467            ),
468            (Some(kind), None) => error_from_kind(kind),
469            (None, Some(message)) => other_error_owned(format!(
470                "streaming encoder is in an errored state: {message}"
471            )),
472            (None, None) => other_error("streaming encoder is in an errored state"),
473        }
474    }
475
476    fn drain_mut(&mut self) -> Result<&mut W, Error> {
477        self.drain
478            .as_mut()
479            .ok_or_else(|| other_error("streaming encoder has no active drain"))
480    }
481
482    fn ensure_frame_started(&mut self) -> Result<(), Error> {
483        if self.frame_started {
484            return Ok(());
485        }
486
487        self.ensure_level_supported()?;
488        // A dictionary is only active when it can actually be primed: the level
489        // compresses (not `Uncompressed`) AND the matcher supports priming AND a
490        // dictionary is attached. Mirrors `FrameCompressor`'s `use_dictionary_state`
491        // so a streaming frame never advertises a `Dictionary_ID`, disables
492        // single-segment, or seeds dict entropy/offsets unless the dictionary is
493        // genuinely in play (otherwise it would emit frames that needlessly
494        // require a dictionary at decode time).
495        let use_dictionary_state =
496            !matches!(self.compression_level, CompressionLevel::Uncompressed)
497                && self.state.matcher.supports_dictionary_priming()
498                && self.dictionary.is_some();
499        // The dictionary content size drives dict-tier match-finder sizing
500        // (consumed inside `reset`), so hand it over BEFORE reset.
501        if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
502            self.state
503                .matcher
504                .set_dictionary_size_hint(dict.inner.dict_content.len());
505        }
506        self.state.matcher.reset(self.compression_level);
507        // Seed the repeat-offset history from the dictionary (upstream zstd
508        // `ZSTD_compress_insertDictionary`), or the default rep codes otherwise.
509        self.state.offset_hist = if use_dictionary_state {
510            self.dictionary
511                .as_ref()
512                .map(|dict| dict.inner.offset_hist)
513                .unwrap_or([1, 4, 8])
514        } else {
515            [1, 4, 8]
516        };
517        // Prime the match-finder with the dictionary content + offsets.
518        // `dict` borrows `self.dictionary`; `self.state.matcher` is a disjoint
519        // field, so the immutable dict borrow and the mutable matcher borrow
520        // coexist (field-level borrow splitting) with no conflict.
521        if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
522            let offset_hist = dict.inner.offset_hist;
523            self.state
524                .matcher
525                .prime_with_dictionary(dict.inner.dict_content.as_slice(), offset_hist);
526        }
527        // Seed the first block's entropy from the dictionary's cached encoder
528        // tables (upstream zstd `cdict->cBlockState`), or clear to defaults.
529        if use_dictionary_state && let Some(cache) = self.dictionary_entropy_cache.as_ref() {
530            self.state.last_huff_table.clone_from(&cache.huff);
531            self.state
532                .fse_tables
533                .ll_previous
534                .clone_from(&cache.ll_previous);
535            self.state
536                .fse_tables
537                .ml_previous
538                .clone_from(&cache.ml_previous);
539            self.state
540                .fse_tables
541                .of_previous
542                .clone_from(&cache.of_previous);
543            let ll_entropy = match cache.ll_previous.as_ref() {
544                Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
545                _ => None,
546            };
547            let ml_entropy = match cache.ml_previous.as_ref() {
548                Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
549                _ => None,
550            };
551            let of_entropy = match cache.of_previous.as_ref() {
552                Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
553                _ => None,
554            };
555            self.state.matcher.seed_dictionary_entropy(
556                self.state.last_huff_table.as_ref(),
557                ll_entropy,
558                ml_entropy,
559                of_entropy,
560            );
561        } else {
562            self.state.last_huff_table = None;
563            self.state.fse_tables.ll_previous = None;
564            self.state.fse_tables.ml_previous = None;
565            self.state.fse_tables.of_previous = None;
566        }
567        // Sync `state.strategy_tag` from the active compression level so the
568        // literal-compression gates (`min_literals_to_compress`, `min_gain`
569        // in `encoding::blocks::compressed`) see the correct strategy for
570        // every frame. Mirrors `FrameCompressor::compress` and keeps both
571        // entry points byte-equivalent at the gate level. A public-parameter
572        // strategy override (#27) wins over the level's derived tag so the
573        // gates see the strategy the matcher actually runs.
574        self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
575            crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
576        });
577        self.state.huf_optimal_search = crate::encoding::frame_compressor::fast_huf_search_enabled(
578            self.state.strategy_tag,
579            self.pledged_content_size.or(self.source_size_hint),
580        );
581        #[cfg(feature = "hash")]
582        {
583            self.hasher = XxHash64::with_seed(0);
584        }
585
586        let window_size = self.state.matcher.window_size();
587        if window_size == 0 {
588            return Err(invalid_input_error(
589                "matcher reported window_size == 0, which is invalid",
590            ));
591        }
592
593        // Single-segment is incompatible with a dictionary (the dictionary
594        // pushes referenceable history before the content, so the frame needs
595        // an explicit window descriptor); gate it off when a dict is attached,
596        // mirroring `FrameCompressor`'s `!use_dictionary_state` guard.
597        // Single-segment also requires the FCS field to be present
598        // (`content_size_flag`): the layout drops the window descriptor,
599        // so the header must carry the content size for decoders to size
600        // their window.
601        let single_segment = self.content_size_flag
602            && !use_dictionary_state
603            && self
604                .pledged_content_size
605                .map(|size| (512..=(1 << 14)).contains(&size) && size <= window_size)
606                .unwrap_or(false);
607
608        let header = FrameHeader {
609            frame_content_size: if self.content_size_flag {
610                self.pledged_content_size
611            } else {
612                None
613            },
614            single_segment,
615            content_checksum: cfg!(feature = "hash") && self.content_checksum,
616            dictionary_id: if use_dictionary_state && self.dictionary_id_flag {
617                self.dictionary.as_ref().map(|dict| dict.inner.id as u64)
618            } else {
619                None
620            },
621            window_size: if single_segment {
622                None
623            } else {
624                Some(window_size)
625            },
626            magicless: self.magicless,
627        };
628        let mut encoded_header = Vec::new();
629        header.serialize(&mut encoded_header);
630        self.drain_mut()
631            .and_then(|drain| drain.write_all(&encoded_header))
632            .map_err(|err| self.fail(err))?;
633
634        self.frame_started = true;
635        Ok(())
636    }
637
638    fn block_capacity(&self) -> usize {
639        let matcher_window = self.state.matcher.window_size() as usize;
640        let ceiling = self
641            .target_block_size
642            .map_or(MAX_BLOCK_SIZE as usize, |t| t as usize);
643        core::cmp::max(1, core::cmp::min(matcher_window, ceiling))
644    }
645
646    fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
647        let mut space = match self.compression_level {
648            CompressionLevel::Fastest
649            | CompressionLevel::Default
650            | CompressionLevel::Better
651            | CompressionLevel::Best
652            | CompressionLevel::Level(_) => self.state.matcher.get_next_space(),
653            CompressionLevel::Uncompressed => Vec::new(),
654        };
655        space.clear();
656        if space.capacity() > block_capacity {
657            space.shrink_to(block_capacity);
658        }
659        if space.capacity() < block_capacity {
660            space.reserve(block_capacity - space.capacity());
661        }
662        space
663    }
664
665    fn emit_full_pending_block(
666        &mut self,
667        block_capacity: usize,
668        consumed: usize,
669    ) -> Option<Result<usize, Error>> {
670        if self.pending.len() != block_capacity {
671            return None;
672        }
673
674        let new_pending = self.allocate_pending_space(block_capacity);
675        let full_block = mem::replace(&mut self.pending, new_pending);
676        if let Err((err, restored_block)) = self.encode_block(full_block, false) {
677            self.pending = restored_block;
678            let err = self.fail(err);
679            if consumed > 0 {
680                return Some(Ok(consumed));
681            }
682            return Some(Err(err));
683        }
684        None
685    }
686
687    fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
688        let block = mem::take(&mut self.pending);
689        if let Err((err, restored_block)) = self.encode_block(block, last_block) {
690            self.pending = restored_block;
691            return Err(self.fail(err));
692        }
693        if !last_block {
694            let block_capacity = self.block_capacity();
695            self.pending = self.allocate_pending_space(block_capacity);
696        }
697        Ok(())
698    }
699
700    // Exhaustive match kept intentionally: adding a new CompressionLevel
701    // variant will produce a compile error here, forcing the developer to
702    // decide whether the streaming encoder supports it before shipping.
703    fn ensure_level_supported(&self) -> Result<(), Error> {
704        match self.compression_level {
705            CompressionLevel::Uncompressed
706            | CompressionLevel::Fastest
707            | CompressionLevel::Default
708            | CompressionLevel::Better
709            | CompressionLevel::Best
710            | CompressionLevel::Level(_) => Ok(()),
711        }
712    }
713
714    fn encode_block(
715        &mut self,
716        uncompressed_data: Vec<u8>,
717        last_block: bool,
718    ) -> Result<(), (Error, Vec<u8>)> {
719        let mut raw_block = Some(uncompressed_data);
720        let mut encoded = Vec::new();
721        mem::swap(&mut encoded, &mut self.encoded_scratch);
722        encoded.clear();
723        let needed_capacity = self.block_capacity() + 3;
724        if encoded.capacity() < needed_capacity {
725            encoded.reserve(needed_capacity.saturating_sub(encoded.len()));
726        }
727        let mut moved_into_matcher = false;
728        if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
729            let header = BlockHeader {
730                last_block,
731                block_type: crate::blocks::block::BlockType::Raw,
732                block_size: 0,
733            };
734            header.serialize(&mut encoded);
735        } else {
736            match self.compression_level {
737                CompressionLevel::Uncompressed => {
738                    let block = raw_block.as_ref().expect("raw block missing");
739                    let header = BlockHeader {
740                        last_block,
741                        block_type: crate::blocks::block::BlockType::Raw,
742                        block_size: block.len() as u32,
743                    };
744                    header.serialize(&mut encoded);
745                    encoded.extend_from_slice(block);
746                }
747                CompressionLevel::Fastest
748                | CompressionLevel::Default
749                | CompressionLevel::Better
750                | CompressionLevel::Best
751                | CompressionLevel::Level(_) => {
752                    let block = raw_block.take().expect("raw block missing");
753                    debug_assert!(!block.is_empty(), "empty blocks handled above");
754                    // A primed dictionary makes "incompressible-looking" blocks
755                    // matchable, so the raw-fast-path must NOT fire. But a dict is
756                    // only PRIMED when the matcher supports priming — a non-priming
757                    // matcher ignores the attached dictionary, so the raw-fast-path
758                    // must stay enabled for it. (This arm is already non-Uncompressed.)
759                    // Mirrors `FrameCompressor`'s `dict_active`.
760                    let dict_active = self.dictionary.is_some()
761                        && self.state.matcher.supports_dictionary_priming();
762                    compress_block_encoded(
763                        &mut self.state,
764                        self.compression_level,
765                        last_block,
766                        block,
767                        &mut encoded,
768                        dict_active,
769                        // No FrameEmitInfo on the streaming encoder path — it
770                        // does not surface per-block layout, so no sidecar.
771                        #[cfg(feature = "lsm")]
772                        None,
773                        #[cfg(all(feature = "lsm", feature = "hash"))]
774                        None,
775                    );
776                    moved_into_matcher = true;
777                }
778            }
779        }
780
781        if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
782            encoded.clear();
783            mem::swap(&mut encoded, &mut self.encoded_scratch);
784            let restored = if moved_into_matcher {
785                self.state.matcher.get_last_space().to_vec()
786            } else {
787                raw_block.unwrap_or_default()
788            };
789            return Err((err, restored));
790        }
791
792        if moved_into_matcher {
793            #[cfg(feature = "hash")]
794            if self.content_checksum {
795                self.hasher.write(self.state.matcher.get_last_space());
796            }
797        } else {
798            self.hash_block(raw_block.as_deref().unwrap_or(&[]));
799        }
800        encoded.clear();
801        mem::swap(&mut encoded, &mut self.encoded_scratch);
802        Ok(())
803    }
804
805    fn write_empty_last_block(&mut self) -> Result<(), Error> {
806        self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
807    }
808
809    fn fail(&mut self, err: Error) -> Error {
810        self.errored = true;
811        if self.last_error_kind.is_none() {
812            self.last_error_kind = Some(err.kind());
813        }
814        if self.last_error_message.is_none() {
815            self.last_error_message = Some(err.to_string());
816        }
817        err
818    }
819
820    #[cfg(feature = "hash")]
821    fn hash_block(&mut self, uncompressed_data: &[u8]) {
822        if self.content_checksum {
823            self.hasher.write(uncompressed_data);
824        }
825    }
826
827    #[cfg(not(feature = "hash"))]
828    fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
829}
830
831impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
832    fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
833        self.ensure_open()?;
834        if buf.is_empty() {
835            return Ok(0);
836        }
837
838        // Check pledge before emitting the frame header so that a misuse
839        // like set_pledged_content_size(0) + write(non_empty) doesn't leave
840        // a partially-written header in the drain.
841        if let Some(pledged) = self.pledged_content_size
842            && self.bytes_consumed >= pledged
843        {
844            return Err(invalid_input_error(
845                "write would exceed pledged content size",
846            ));
847        }
848
849        self.ensure_frame_started()?;
850
851        // Enforce pledged upper bound: truncate the accepted slice to the
852        // remaining allowance so that partial-write semantics are honored
853        // (return Ok(n) with n < buf.len()) instead of failing the full call.
854        let buf = if let Some(pledged) = self.pledged_content_size {
855            let remaining_allowed = pledged
856                .checked_sub(self.bytes_consumed)
857                .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
858            if remaining_allowed == 0 {
859                return Err(invalid_input_error(
860                    "write would exceed pledged content size",
861                ));
862            }
863            let accepted = core::cmp::min(
864                buf.len(),
865                usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
866            );
867            &buf[..accepted]
868        } else {
869            buf
870        };
871
872        let block_capacity = self.block_capacity();
873        if self.pending.capacity() == 0 {
874            self.pending = self.allocate_pending_space(block_capacity);
875        }
876        let mut remaining = buf;
877        let mut consumed = 0usize;
878
879        while !remaining.is_empty() {
880            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
881                return result;
882            }
883
884            let available = block_capacity - self.pending.len();
885            let to_take = core::cmp::min(remaining.len(), available);
886            if to_take == 0 {
887                break;
888            }
889            self.pending.extend_from_slice(&remaining[..to_take]);
890            remaining = &remaining[to_take..];
891            consumed += to_take;
892
893            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
894                if let Ok(n) = &result {
895                    self.bytes_consumed += *n as u64;
896                }
897                return result;
898            }
899        }
900        self.bytes_consumed += consumed as u64;
901        Ok(consumed)
902    }
903
904    fn flush(&mut self) -> Result<(), Error> {
905        self.ensure_open()?;
906        if self.pending.is_empty() {
907            return self
908                .drain_mut()
909                .and_then(|drain| drain.flush())
910                .map_err(|err| self.fail(err));
911        }
912        self.ensure_frame_started()?;
913        self.emit_pending_block(false)?;
914        self.drain_mut()
915            .and_then(|drain| drain.flush())
916            .map_err(|err| self.fail(err))
917    }
918}
919
920fn error_from_kind(kind: ErrorKind) -> Error {
921    Error::from(kind)
922}
923
924fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
925    #[cfg(feature = "std")]
926    {
927        Error::new(kind, message)
928    }
929    #[cfg(not(feature = "std"))]
930    {
931        Error::new(kind, alloc::boxed::Box::new(message))
932    }
933}
934
935fn invalid_input_error(message: &str) -> Error {
936    #[cfg(feature = "std")]
937    {
938        Error::new(ErrorKind::InvalidInput, message)
939    }
940    #[cfg(not(feature = "std"))]
941    {
942        Error::new(
943            ErrorKind::Other,
944            alloc::boxed::Box::new(alloc::string::String::from(message)),
945        )
946    }
947}
948
949fn other_error_owned(message: String) -> Error {
950    #[cfg(feature = "std")]
951    {
952        Error::other(message)
953    }
954    #[cfg(not(feature = "std"))]
955    {
956        Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
957    }
958}
959
960fn other_error(message: &str) -> Error {
961    #[cfg(feature = "std")]
962    {
963        Error::other(message)
964    }
965    #[cfg(not(feature = "std"))]
966    {
967        Error::new(
968            ErrorKind::Other,
969            alloc::boxed::Box::new(alloc::string::String::from(message)),
970        )
971    }
972}
973
974#[cfg(test)]
975mod tests {
976    use crate::decoding::StreamingDecoder;
977    use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
978    use crate::io::{Error, ErrorKind, Read, Write};
979    use alloc::vec;
980    use alloc::vec::Vec;
981
982    struct TinyMatcher {
983        last_space: Vec<u8>,
984        window_size: u64,
985    }
986
987    impl TinyMatcher {
988        fn new(window_size: u64) -> Self {
989            Self {
990                last_space: Vec::new(),
991                window_size,
992            }
993        }
994    }
995
996    impl Matcher for TinyMatcher {
997        fn get_next_space(&mut self) -> Vec<u8> {
998            vec![0; self.window_size as usize]
999        }
1000
1001        fn get_last_space(&mut self) -> &[u8] {
1002            self.last_space.as_slice()
1003        }
1004
1005        fn commit_space(&mut self, space: Vec<u8>) {
1006            self.last_space = space;
1007        }
1008
1009        fn skip_matching(&mut self) {}
1010
1011        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
1012            handle_sequence(Sequence::Literals {
1013                literals: self.last_space.as_slice(),
1014            });
1015        }
1016
1017        fn reset(&mut self, _level: CompressionLevel) {
1018            self.last_space.clear();
1019        }
1020
1021        fn window_size(&self) -> u64 {
1022            self.window_size
1023        }
1024    }
1025
1026    struct FailingWriteOnce {
1027        writes: usize,
1028        fail_on_write_number: usize,
1029        sink: Vec<u8>,
1030    }
1031
1032    impl FailingWriteOnce {
1033        fn new(fail_on_write_number: usize) -> Self {
1034            Self {
1035                writes: 0,
1036                fail_on_write_number,
1037                sink: Vec::new(),
1038            }
1039        }
1040    }
1041
1042    impl Write for FailingWriteOnce {
1043        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
1044            self.writes += 1;
1045            if self.writes == self.fail_on_write_number {
1046                return Err(super::other_error("injected write failure"));
1047            }
1048            self.sink.extend_from_slice(buf);
1049            Ok(buf.len())
1050        }
1051
1052        fn flush(&mut self) -> Result<(), Error> {
1053            Ok(())
1054        }
1055    }
1056
1057    struct FailingWithKind {
1058        writes: usize,
1059        fail_on_write_number: usize,
1060        kind: ErrorKind,
1061    }
1062
1063    impl FailingWithKind {
1064        fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
1065            Self {
1066                writes: 0,
1067                fail_on_write_number,
1068                kind,
1069            }
1070        }
1071    }
1072
1073    impl Write for FailingWithKind {
1074        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
1075            self.writes += 1;
1076            if self.writes == self.fail_on_write_number {
1077                return Err(Error::from(self.kind));
1078            }
1079            Ok(buf.len())
1080        }
1081
1082        fn flush(&mut self) -> Result<(), Error> {
1083            Ok(())
1084        }
1085    }
1086
1087    struct PartialThenFailWriter {
1088        writes: usize,
1089        fail_on_write_number: usize,
1090        partial_prefix_len: usize,
1091        terminal_failure: bool,
1092        sink: Vec<u8>,
1093    }
1094
1095    impl PartialThenFailWriter {
1096        fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
1097            Self {
1098                writes: 0,
1099                fail_on_write_number,
1100                partial_prefix_len,
1101                terminal_failure: false,
1102                sink: Vec::new(),
1103            }
1104        }
1105    }
1106
1107    impl Write for PartialThenFailWriter {
1108        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
1109            if self.terminal_failure {
1110                return Err(super::other_error("injected terminal write failure"));
1111            }
1112
1113            self.writes += 1;
1114            if self.writes == self.fail_on_write_number {
1115                let written = core::cmp::min(self.partial_prefix_len, buf.len());
1116                if written > 0 {
1117                    self.sink.extend_from_slice(&buf[..written]);
1118                    self.terminal_failure = true;
1119                    return Ok(written);
1120                }
1121                return Err(super::other_error("injected terminal write failure"));
1122            }
1123
1124            self.sink.extend_from_slice(buf);
1125            Ok(buf.len())
1126        }
1127
1128        fn flush(&mut self) -> Result<(), Error> {
1129            Ok(())
1130        }
1131    }
1132
1133    /// Pre-write `set_magicless(true)` → emitted frame omits the
1134    /// magic prefix AND round-trips through a magicless-aware
1135    /// decoder.
1136    #[test]
1137    fn streaming_encoder_set_magicless_before_write_omits_magic_and_roundtrips() {
1138        use crate::common::MAGIC_NUM;
1139        let payload = b"streaming-magicless-roundtrip-".repeat(64);
1140
1141        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1142        encoder
1143            .set_magicless(true)
1144            .expect("set_magicless pre-write");
1145        encoder.write_all(&payload).unwrap();
1146        let compressed = encoder.finish().unwrap();
1147
1148        assert!(
1149            !compressed.starts_with(&MAGIC_NUM.to_le_bytes()),
1150            "magicless frame must omit the 4-byte magic prefix",
1151        );
1152
1153        let mut decoder = crate::decoding::FrameDecoder::new();
1154        decoder.set_magicless(true);
1155        let mut cursor: &[u8] = compressed.as_slice();
1156        decoder.init(&mut cursor).expect("magicless init");
1157        decoder
1158            .decode_blocks(&mut cursor, crate::decoding::BlockDecodingStrategy::All)
1159            .expect("decode_blocks");
1160        let mut decoded: Vec<u8> = Vec::new();
1161        decoder
1162            .collect_to_writer(&mut decoded)
1163            .expect("collect_to_writer");
1164        assert_eq!(decoded, payload);
1165    }
1166
1167    /// `set_magicless` after the first write MUST return an error
1168    /// (the frame header has already been emitted, flipping the flag
1169    /// can't affect the current frame). Mirrors
1170    /// `set_pledged_content_size` / `set_source_size_hint` semantics.
1171    #[test]
1172    fn streaming_encoder_set_magicless_after_first_write_errors() {
1173        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1174        encoder.write_all(b"first-block").unwrap();
1175        let err = encoder
1176            .set_magicless(true)
1177            .expect_err("set_magicless after first write must error");
1178        assert_eq!(
1179            err.kind(),
1180            crate::io::ErrorKind::InvalidInput,
1181            "expected InvalidInput when setting magicless after frame_started, got {err:?}",
1182        );
1183    }
1184
1185    #[test]
1186    fn streaming_encoder_roundtrip_multiple_writes() {
1187        let payload = b"streaming-encoder-roundtrip-".repeat(1024);
1188        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1189        for chunk in payload.chunks(313) {
1190            encoder.write_all(chunk).unwrap();
1191        }
1192        let compressed = encoder.finish().unwrap();
1193
1194        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1195        let mut decoded = Vec::new();
1196        decoder.read_to_end(&mut decoded).unwrap();
1197        assert_eq!(decoded, payload);
1198    }
1199
1200    #[test]
1201    fn flush_emits_nonempty_partial_output() {
1202        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1203        encoder.write_all(b"partial-block").unwrap();
1204        encoder.flush().unwrap();
1205        let flushed_len = encoder.get_ref().len();
1206        assert!(
1207            flushed_len > 0,
1208            "flush should emit header+partial block bytes"
1209        );
1210        let compressed = encoder.finish().unwrap();
1211        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1212        let mut decoded = Vec::new();
1213        decoder.read_to_end(&mut decoded).unwrap();
1214        assert_eq!(decoded, b"partial-block");
1215    }
1216
1217    #[test]
1218    fn flush_without_writes_does_not_emit_frame_header() {
1219        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1220        encoder.flush().unwrap();
1221        assert!(encoder.get_ref().is_empty());
1222    }
1223
1224    #[test]
1225    fn block_boundary_write_emits_block_in_same_call() {
1226        let mut boundary = StreamingEncoder::new_with_matcher(
1227            TinyMatcher::new(4),
1228            Vec::new(),
1229            CompressionLevel::Uncompressed,
1230        );
1231        let mut below = StreamingEncoder::new_with_matcher(
1232            TinyMatcher::new(4),
1233            Vec::new(),
1234            CompressionLevel::Uncompressed,
1235        );
1236
1237        boundary.write_all(b"ABCD").unwrap();
1238        below.write_all(b"ABC").unwrap();
1239
1240        let boundary_len = boundary.get_ref().len();
1241        let below_len = below.get_ref().len();
1242        assert!(
1243            boundary_len > below_len,
1244            "full block should be emitted immediately at block boundary"
1245        );
1246    }
1247
1248    #[test]
1249    fn finish_consumes_encoder_and_emits_frame() {
1250        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1251        encoder.write_all(b"abc").unwrap();
1252        let compressed = encoder.finish().unwrap();
1253        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1254        let mut decoded = Vec::new();
1255        decoder.read_to_end(&mut decoded).unwrap();
1256        assert_eq!(decoded, b"abc");
1257    }
1258
1259    #[test]
1260    fn finish_without_writes_emits_empty_frame() {
1261        let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1262        let compressed = encoder.finish().unwrap();
1263        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1264        let mut decoded = Vec::new();
1265        decoder.read_to_end(&mut decoded).unwrap();
1266        assert!(decoded.is_empty());
1267    }
1268
1269    #[test]
1270    fn write_empty_buffer_returns_zero() {
1271        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1272        assert_eq!(encoder.write(&[]).unwrap(), 0);
1273        let _ = encoder.finish().unwrap();
1274    }
1275
1276    #[test]
1277    fn uncompressed_level_roundtrip() {
1278        let payload = b"uncompressed-streaming-roundtrip".repeat(64);
1279        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
1280        for chunk in payload.chunks(41) {
1281            encoder.write_all(chunk).unwrap();
1282        }
1283        let compressed = encoder.finish().unwrap();
1284        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1285        let mut decoded = Vec::new();
1286        decoder.read_to_end(&mut decoded).unwrap();
1287        assert_eq!(decoded, payload);
1288    }
1289
1290    #[test]
1291    fn better_level_streaming_roundtrip() {
1292        let payload = b"better-level-streaming-test".repeat(256);
1293        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
1294        for chunk in payload.chunks(53) {
1295            encoder.write_all(chunk).unwrap();
1296        }
1297        let compressed = encoder.finish().unwrap();
1298        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1299        let mut decoded = Vec::new();
1300        decoder.read_to_end(&mut decoded).unwrap();
1301        assert_eq!(decoded, payload);
1302    }
1303
1304    #[test]
1305    fn zero_window_matcher_returns_invalid_input_error() {
1306        let mut encoder = StreamingEncoder::new_with_matcher(
1307            TinyMatcher::new(0),
1308            Vec::new(),
1309            CompressionLevel::Fastest,
1310        );
1311        let err = encoder.write_all(b"payload").unwrap_err();
1312        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1313    }
1314
1315    #[test]
1316    fn best_level_streaming_roundtrip() {
1317        // 200 KiB payload crosses the 128 KiB block boundary, exercising
1318        // multi-block emission and matcher state carry-over for Best.
1319        let payload = b"best-level-streaming-test".repeat(8 * 1024);
1320        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
1321        for chunk in payload.chunks(53) {
1322            encoder.write_all(chunk).unwrap();
1323        }
1324        let compressed = encoder.finish().unwrap();
1325        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1326        let mut decoded = Vec::new();
1327        decoder.read_to_end(&mut decoded).unwrap();
1328        assert_eq!(decoded, payload);
1329    }
1330
1331    #[test]
1332    fn write_failure_poisoning_is_sticky() {
1333        let mut encoder = StreamingEncoder::new_with_matcher(
1334            TinyMatcher::new(4),
1335            FailingWriteOnce::new(1),
1336            CompressionLevel::Uncompressed,
1337        );
1338
1339        assert!(encoder.write_all(b"ABCD").is_err());
1340        assert!(encoder.flush().is_err());
1341        assert!(encoder.write_all(b"EFGH").is_err());
1342        assert_eq!(encoder.get_ref().sink.len(), 0);
1343        assert!(encoder.finish().is_err());
1344    }
1345
1346    #[test]
1347    fn poisoned_encoder_returns_original_error_kind() {
1348        let mut encoder = StreamingEncoder::new_with_matcher(
1349            TinyMatcher::new(4),
1350            FailingWithKind::new(1, ErrorKind::BrokenPipe),
1351            CompressionLevel::Uncompressed,
1352        );
1353
1354        let first_error = encoder.write_all(b"ABCD").unwrap_err();
1355        assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
1356
1357        let second_error = encoder.write_all(b"EFGH").unwrap_err();
1358        assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
1359    }
1360
1361    #[test]
1362    fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
1363        let payload = b"ABCDEFGHIJKL";
1364        let mut encoder = StreamingEncoder::new_with_matcher(
1365            TinyMatcher::new(4),
1366            FailingWriteOnce::new(3),
1367            CompressionLevel::Uncompressed,
1368        );
1369
1370        let first_write = encoder.write(payload).unwrap();
1371        assert_eq!(first_write, 8);
1372        assert!(encoder.write(&payload[first_write..]).is_err());
1373        assert!(encoder.flush().is_err());
1374        assert!(encoder.write_all(b"EFGH").is_err());
1375    }
1376
1377    #[test]
1378    fn partial_write_failure_after_progress_poisons_encoder() {
1379        let payload = b"ABCDEFGHIJKL";
1380        let mut encoder = StreamingEncoder::new_with_matcher(
1381            TinyMatcher::new(4),
1382            PartialThenFailWriter::new(3, 1),
1383            CompressionLevel::Uncompressed,
1384        );
1385
1386        let first_write = encoder.write(payload).unwrap();
1387        assert_eq!(first_write, 8);
1388
1389        let second_write = encoder.write(&payload[first_write..]);
1390        assert!(second_write.is_err());
1391        assert!(encoder.flush().is_err());
1392        assert!(encoder.write_all(b"MNOP").is_err());
1393    }
1394
1395    #[test]
1396    fn new_with_matcher_and_get_mut_work() {
1397        let matcher = TinyMatcher::new(128 * 1024);
1398        let mut encoder =
1399            StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
1400        encoder.get_mut().extend_from_slice(b"");
1401        encoder.write_all(b"custom-matcher").unwrap();
1402        let compressed = encoder.finish().unwrap();
1403        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1404        let mut decoded = Vec::new();
1405        decoder.read_to_end(&mut decoded).unwrap();
1406        assert_eq!(decoded, b"custom-matcher");
1407    }
1408
1409    #[test]
1410    fn pledged_content_size_written_in_header() {
1411        let payload = b"hello world, pledged size test";
1412        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1413        encoder
1414            .set_pledged_content_size(payload.len() as u64)
1415            .unwrap();
1416        encoder.write_all(payload).unwrap();
1417        let compressed = encoder.finish().unwrap();
1418
1419        // Verify FCS is present and correct
1420        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1421            .unwrap()
1422            .0;
1423        assert_eq!(header.frame_content_size(), payload.len() as u64);
1424
1425        // Verify roundtrip
1426        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1427        let mut decoded = Vec::new();
1428        decoder.read_to_end(&mut decoded).unwrap();
1429        assert_eq!(decoded, payload);
1430    }
1431
1432    #[test]
1433    fn pledged_content_size_mismatch_returns_error() {
1434        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1435        encoder.set_pledged_content_size(100).unwrap();
1436        encoder.write_all(b"short payload").unwrap(); // 13 bytes != 100 pledged
1437        let err = encoder.finish().unwrap_err();
1438        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1439    }
1440
1441    #[test]
1442    fn write_exceeding_pledge_returns_error() {
1443        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1444        encoder.set_pledged_content_size(5).unwrap();
1445        let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
1446        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1447    }
1448
1449    #[test]
1450    fn write_straddling_pledge_reports_partial_progress() {
1451        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1452        encoder.set_pledged_content_size(5).unwrap();
1453        // write() should accept exactly 5 bytes (partial progress)
1454        assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
1455        // Next write should fail — pledge exhausted
1456        let err = encoder.write(b"g").unwrap_err();
1457        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1458    }
1459
1460    #[test]
1461    fn encoded_scratch_capacity_is_reused_across_blocks() {
1462        let payload = vec![0xAB; 64 * 3];
1463        let mut encoder = StreamingEncoder::new_with_matcher(
1464            TinyMatcher::new(64),
1465            Vec::new(),
1466            CompressionLevel::Uncompressed,
1467        );
1468
1469        encoder.write_all(&payload[..64]).unwrap();
1470        let first_capacity = encoder.encoded_scratch.capacity();
1471        assert!(
1472            first_capacity >= 67,
1473            "expected encoded scratch to keep block header + payload capacity",
1474        );
1475
1476        encoder.write_all(&payload[64..128]).unwrap();
1477        let second_capacity = encoder.encoded_scratch.capacity();
1478        assert!(
1479            second_capacity >= first_capacity,
1480            "encoded scratch capacity should be reused across block emits",
1481        );
1482
1483        encoder.write_all(&payload[128..]).unwrap();
1484        let compressed = encoder.finish().unwrap();
1485        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1486        let mut decoded = Vec::new();
1487        decoder.read_to_end(&mut decoded).unwrap();
1488        assert_eq!(decoded, payload);
1489    }
1490
1491    #[test]
1492    fn pledged_content_size_after_write_returns_error() {
1493        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1494        encoder.write_all(b"already writing").unwrap();
1495        let err = encoder.set_pledged_content_size(15).unwrap_err();
1496        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1497    }
1498
1499    #[test]
1500    fn source_size_hint_directly_reduces_window_header() {
1501        let payload = b"streaming-source-size-hint".repeat(64);
1502
1503        let mut no_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1504        no_hint.write_all(payload.as_slice()).unwrap();
1505        let no_hint_frame = no_hint.finish().unwrap();
1506        let no_hint_header = crate::decoding::frame::read_frame_header(no_hint_frame.as_slice())
1507            .unwrap()
1508            .0;
1509        let no_hint_window = no_hint_header.window_size().unwrap();
1510
1511        let mut with_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1512        with_hint
1513            .set_source_size_hint(payload.len() as u64)
1514            .unwrap();
1515        with_hint.write_all(payload.as_slice()).unwrap();
1516        let late_hint_err = with_hint
1517            .set_source_size_hint(payload.len() as u64)
1518            .unwrap_err();
1519        assert_eq!(late_hint_err.kind(), ErrorKind::InvalidInput);
1520        let with_hint_frame = with_hint.finish().unwrap();
1521        let with_hint_header =
1522            crate::decoding::frame::read_frame_header(with_hint_frame.as_slice())
1523                .unwrap()
1524                .0;
1525        let with_hint_window = with_hint_header.window_size().unwrap();
1526
1527        assert!(
1528            with_hint_window <= no_hint_window,
1529            "source size hint should not increase advertised window"
1530        );
1531
1532        let mut decoder = StreamingDecoder::new(with_hint_frame.as_slice()).unwrap();
1533        let mut decoded = Vec::new();
1534        decoder.read_to_end(&mut decoded).unwrap();
1535        assert_eq!(decoded, payload);
1536    }
1537
1538    #[test]
1539    fn single_segment_requires_pledged_to_fit_matcher_window() {
1540        let payload = b"streaming-window-gate-".repeat(60); // 1320 bytes
1541        let mut encoder = StreamingEncoder::new_with_matcher(
1542            TinyMatcher::new(1024),
1543            Vec::new(),
1544            CompressionLevel::Fastest,
1545        );
1546        encoder
1547            .set_pledged_content_size(payload.len() as u64)
1548            .unwrap();
1549        encoder.write_all(payload.as_slice()).unwrap();
1550        let compressed = encoder.finish().unwrap();
1551
1552        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1553            .unwrap()
1554            .0;
1555        assert_eq!(header.frame_content_size(), payload.len() as u64);
1556        assert!(
1557            !header.descriptor.single_segment_flag(),
1558            "single-segment must stay off when pledged content size exceeds matcher window"
1559        );
1560        assert!(
1561            header.window_size().unwrap() >= 1024,
1562            "window descriptor should be present when single-segment is disabled"
1563        );
1564    }
1565
1566    #[test]
1567    fn ensure_frame_started_refreshes_stale_strategy_tag_at_reset() {
1568        // The literal-compression gates (`min_literals_to_compress`,
1569        // `min_gain`) read `state.strategy_tag`. Regression: every
1570        // reset site MUST refresh that tag from the active compression
1571        // level — relying on construction-time initialization alone is
1572        // not enough, because later mutations or reuse patterns can
1573        // leave the tag stale.
1574        //
1575        // To exercise the RESET-time refresh (not just the
1576        // construction-time init that `StreamingEncoder::new` does for
1577        // free), this test deliberately corrupts `state.strategy_tag`
1578        // to a value that does NOT match the active level, then
1579        // triggers `ensure_frame_started` and asserts the reset path
1580        // wrote the correct tag back. If the sync line in
1581        // `ensure_frame_started` were deleted, the corrupted value
1582        // would survive the write and fail the assertion.
1583        use crate::encoding::strategy::StrategyTag;
1584        for level in [
1585            CompressionLevel::Fastest,
1586            CompressionLevel::Default,
1587            CompressionLevel::Better,
1588            CompressionLevel::Best,
1589        ] {
1590            let expected = StrategyTag::for_compression_level(level);
1591            let mut encoder = StreamingEncoder::new(Vec::new(), level);
1592            // Pick a sentinel that differs from the legitimate tag so
1593            // a missing reset-time sync is observable. BtUltra2 is the
1594            // most-aggressive variant; the four levels above resolve
1595            // to Fast/Dfast/Lazy/Lazy respectively, none equal to it.
1596            let sentinel = StrategyTag::BtUltra2;
1597            assert_ne!(
1598                expected, sentinel,
1599                "sentinel must differ from the legitimate tag at level {level:?}",
1600            );
1601            encoder.state.strategy_tag = sentinel;
1602            encoder.write_all(b"x").unwrap();
1603            assert_eq!(
1604                encoder.state.strategy_tag, expected,
1605                "reset-time strategy_tag sync missing at level {level:?}: \
1606                 sentinel survived `ensure_frame_started`",
1607            );
1608            let _ = encoder.finish().unwrap();
1609        }
1610    }
1611
1612    /// Level 22 advertises the largest default window (`window_log 27` =
1613    /// 128 MiB). Because streaming omits FCS, that window is written verbatim
1614    /// into the frame header — so the encoder's max window MUST NOT exceed the
1615    /// decoder's [`crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE`], or our own
1616    /// decoder rejects our own frame with `WindowSizeTooBig`. Regression for
1617    /// the encoder↔decoder window-cap mismatch: streaming L22 must round-trip
1618    /// through `StreamingDecoder` (and, implicitly, any stock zstd decoder,
1619    /// which accepts up to the same 128 MiB default).
1620    #[test]
1621    fn level_22_streaming_window_roundtrips_in_our_decoder() {
1622        let payload = b"level-22-streaming-window-cap-".repeat(512);
1623        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(22));
1624        for chunk in payload.chunks(101) {
1625            encoder.write_all(chunk).unwrap();
1626        }
1627        let compressed = encoder.finish().unwrap();
1628
1629        // The advertised window equals the L22 default (128 MiB) and must sit
1630        // at or below the decoder cap — otherwise the round-trip below fails.
1631        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1632            .unwrap()
1633            .0;
1634        let window = header.window_size().unwrap();
1635        assert!(
1636            window <= crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE,
1637            "L22 advertised window {window} exceeds decoder cap {}",
1638            crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE,
1639        );
1640
1641        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1642        let mut decoded = Vec::new();
1643        decoder.read_to_end(&mut decoded).unwrap();
1644        assert_eq!(decoded, payload);
1645    }
1646
1647    /// `set_content_checksum(false)` before the first write must clear the
1648    /// frame header's `Content_Checksum_flag` and the frame must still
1649    /// round-trip through the decoder.
1650    #[test]
1651    fn streaming_encoder_set_content_checksum_false_clears_header_flag() {
1652        let payload = b"streaming-checksum-toggle-".repeat(64);
1653        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1654        encoder
1655            .set_content_checksum(false)
1656            .expect("set_content_checksum pre-write");
1657        encoder.write_all(&payload).unwrap();
1658        let compressed = encoder.finish().unwrap();
1659
1660        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1661            .unwrap()
1662            .0;
1663        assert!(
1664            !header.descriptor.content_checksum_flag(),
1665            "content_checksum(false) must clear the frame header flag",
1666        );
1667
1668        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1669        let mut decoded = Vec::new();
1670        decoder.read_to_end(&mut decoded).unwrap();
1671        assert_eq!(decoded, payload);
1672    }
1673
1674    /// With the `hash` feature, disabling the checksum must drop exactly the
1675    /// 4-byte XXH64 trailer: the same payload encoded with the checksum on is
1676    /// 4 bytes longer and its header flag is set.
1677    #[cfg(feature = "hash")]
1678    #[test]
1679    fn streaming_encoder_set_content_checksum_false_omits_trailer() {
1680        let payload = b"streaming-checksum-trailer-".repeat(64);
1681
1682        let mut with = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1683        // Explicit: the encoder default is off (upstream library parity).
1684        with.set_content_checksum(true)
1685            .expect("set_content_checksum pre-write");
1686        with.write_all(&payload).unwrap();
1687        let with_checksum = with.finish().unwrap();
1688
1689        let mut without = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1690        without
1691            .set_content_checksum(false)
1692            .expect("set_content_checksum pre-write");
1693        without.write_all(&payload).unwrap();
1694        let without_checksum = without.finish().unwrap();
1695
1696        assert!(
1697            crate::decoding::frame::read_frame_header(with_checksum.as_slice())
1698                .unwrap()
1699                .0
1700                .descriptor
1701                .content_checksum_flag(),
1702            "default checksum-on frame must set the header flag",
1703        );
1704        assert_eq!(
1705            with_checksum.len(),
1706            without_checksum.len() + 4,
1707            "checksum-on frame must carry exactly the 4-byte XXH64 trailer",
1708        );
1709    }
1710
1711    /// `set_content_checksum` after the first write must error: the frame
1712    /// header (and its checksum flag) is already emitted, so a late flip would
1713    /// desync the header flag from the emitted trailer. Mirrors
1714    /// `set_magicless` / `set_pledged_content_size` semantics.
1715    #[test]
1716    fn streaming_encoder_set_content_checksum_after_first_write_errors() {
1717        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1718        encoder.write_all(b"first-block").unwrap();
1719        let err = encoder
1720            .set_content_checksum(false)
1721            .expect_err("set_content_checksum after first write must error");
1722        assert_eq!(
1723            err.kind(),
1724            ErrorKind::InvalidInput,
1725            "expected InvalidInput when setting content checksum after frame_started, got {err:?}",
1726        );
1727    }
1728
1729    #[test]
1730    fn no_pledged_size_omits_fcs_from_header() {
1731        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1732        encoder.write_all(b"no pledged size").unwrap();
1733        let compressed = encoder.finish().unwrap();
1734
1735        // FCS should be omitted from the header; the decoder reports absent FCS as 0.
1736        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1737            .unwrap()
1738            .0;
1739        assert_eq!(header.frame_content_size(), 0);
1740        // Verify the descriptor confirms FCS field is truly absent (0 bytes),
1741        // not just FCS present with value 0.
1742        assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1743    }
1744
1745    #[test]
1746    fn streaming_encoder_with_dictionary_roundtrips_and_carries_dict_id() {
1747        use alloc::format;
1748        let dict_raw = include_bytes!("../../dict_tests/dictionary");
1749        let dict_id = crate::decoding::Dictionary::decode_dict(dict_raw)
1750            .unwrap()
1751            .id;
1752
1753        // Dictionary-resembling payload (the dict was trained on similar lines),
1754        // fed in many small writes so the dict + cross-block matching are both
1755        // exercised by the streaming path.
1756        let mut payload = Vec::new();
1757        for i in 0..400u32 {
1758            payload.extend_from_slice(
1759                format!("tenant=demo table=orders key={i} region=eu payload=aaaaabbbbbccccc\n")
1760                    .as_bytes(),
1761            );
1762        }
1763
1764        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Level(19));
1765        encoder
1766            .set_dictionary_from_bytes(dict_raw)
1767            .expect("attach dictionary");
1768        for chunk in payload.chunks(777) {
1769            encoder.write_all(chunk).unwrap();
1770        }
1771        let compressed = encoder.finish().unwrap();
1772
1773        // The frame header advertises the dictionary ID (single-segment is
1774        // disabled for dictionary frames, so an explicit window is present).
1775        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1776            .unwrap()
1777            .0;
1778        assert_eq!(header.dictionary_id(), Some(dict_id));
1779
1780        // Round-trip through a decoder primed with the SAME dictionary.
1781        let mut decoder =
1782            StreamingDecoder::new_with_dictionary_bytes(compressed.as_slice(), dict_raw).unwrap();
1783        let mut decoded = Vec::new();
1784        decoder.read_to_end(&mut decoded).unwrap();
1785        assert_eq!(decoded, payload);
1786
1787        // The dictionary is actually used: the dict frame is no larger than the
1788        // no-dictionary frame on this dict-resembling payload (a dict that was
1789        // ignored could only ever make the frame the same size or bigger).
1790        let mut nodict = StreamingEncoder::new(Vec::new(), CompressionLevel::Level(19));
1791        for chunk in payload.chunks(777) {
1792            nodict.write_all(chunk).unwrap();
1793        }
1794        let nodict_frame = nodict.finish().unwrap();
1795        assert!(
1796            compressed.len() <= nodict_frame.len(),
1797            "dict frame {} should not exceed no-dict frame {}",
1798            compressed.len(),
1799            nodict_frame.len()
1800        );
1801    }
1802
1803    #[test]
1804    fn streaming_encoder_strategy_override_survives_frame_start() {
1805        // A `.strategy(...)` override must drive BOTH the matcher and the
1806        // literal-compression gates (`state.strategy_tag`) once the frame
1807        // starts. `ensure_frame_started` re-syncs the tag, so without persisting
1808        // the override it would silently fall back to the level's strategy and
1809        // diverge from `FrameCompressor` for the same parameters.
1810        use crate::encoding::{CompressionParameters, Strategy};
1811        let level = CompressionLevel::Fastest;
1812        let level_tag = crate::encoding::strategy::StrategyTag::for_compression_level(level);
1813        let override_tag = Strategy::Greedy.tag();
1814        assert_ne!(
1815            level_tag, override_tag,
1816            "test needs an override that changes the derived tag"
1817        );
1818
1819        let params = CompressionParameters::builder(level)
1820            .strategy(Strategy::Greedy)
1821            .build()
1822            .unwrap();
1823        let payload = b"override must outlive the frame header";
1824        let mut encoder = StreamingEncoder::new(Vec::new(), level);
1825        encoder.set_parameters(&params).unwrap();
1826        encoder.write_all(payload).unwrap();
1827        assert_eq!(
1828            encoder.state.strategy_tag, override_tag,
1829            "strategy override was discarded when the frame started"
1830        );
1831
1832        let compressed = encoder.finish().unwrap();
1833        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1834        let mut decoded = Vec::new();
1835        decoder.read_to_end(&mut decoded).unwrap();
1836        assert_eq!(decoded, payload);
1837    }
1838
1839    #[test]
1840    fn streaming_encoder_uncompressed_with_dictionary_omits_dict_id() {
1841        // At `Uncompressed` the matcher cannot prime a dictionary, so an
1842        // attached dictionary must NOT be reflected in the frame: advertising a
1843        // `Dictionary_ID` would force a dictionary at decode time for a frame
1844        // that does not actually depend on one. Mirrors `FrameCompressor`'s
1845        // `use_dictionary_state` gate.
1846        let dict_raw = include_bytes!("../../dict_tests/dictionary");
1847        let payload = b"tenant=demo table=orders region=eu payload=aaaaabbbbbccccc";
1848        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
1849        encoder
1850            .set_dictionary_from_bytes(dict_raw)
1851            .expect("attach dictionary");
1852        encoder.write_all(payload).unwrap();
1853        let compressed = encoder.finish().unwrap();
1854
1855        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1856            .unwrap()
1857            .0;
1858        assert_eq!(
1859            header.dictionary_id(),
1860            None,
1861            "uncompressed frame must not require a dictionary at decode time"
1862        );
1863
1864        // Decodes WITHOUT any dictionary.
1865        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1866        let mut decoded = Vec::new();
1867        decoder.read_to_end(&mut decoded).unwrap();
1868        assert_eq!(decoded, payload);
1869    }
1870}