Skip to main content

structured_zstd/encoding/
streaming_encoder.rs

1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14    CompressionLevel, EncoderDictionary, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15    frame_compressor::CachedDictionaryEntropy, frame_compressor::CompressState,
16    frame_compressor::FseTables, frame_compressor::PreviousFseTable, frame_header::FrameHeader,
17};
18use crate::io::{Error, ErrorKind, Write};
19
20/// Incremental frame encoder that implements [`Write`].
21///
22/// Data can be provided with multiple `write()` calls. Full blocks are compressed
23/// automatically, `flush()` emits the currently buffered partial block as non-last,
24/// and `finish()` closes the frame and returns the wrapped writer.
25pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
26    drain: Option<W>,
27    compression_level: CompressionLevel,
28    state: CompressState<M>,
29    pending: Vec<u8>,
30    encoded_scratch: Vec<u8>,
31    errored: bool,
32    last_error_kind: Option<ErrorKind>,
33    last_error_message: Option<String>,
34    frame_started: bool,
35    /// Upper bound on emitted block sizes (upstream `ZSTD_c_targetCBlockSize`
36    /// semantics; see `FrameCompressor::set_target_block_size`). `None` =
37    /// the format's 128 KiB ceiling.
38    target_block_size: Option<u32>,
39    pledged_content_size: Option<u64>,
40    /// Whether a pledged size is written into the header's
41    /// `Frame_Content_Size` field (upstream `ZSTD_c_contentSizeFlag`).
42    /// Pledge *enforcement* is independent of this flag — upstream
43    /// validates consumed bytes against the pledge at frame end even
44    /// when the header omits the field. Default `true`.
45    content_size_flag: bool,
46    bytes_consumed: u64,
47    /// Effective strategy tag when a public-parameter
48    /// [`Strategy`](crate::encoding::Strategy) override (#27) is active, mirroring
49    /// [`FrameCompressor`](crate::encoding::FrameCompressor)'s field. `Some`
50    /// survives frame start so the literal-compression gates run the same
51    /// strategy the matcher does; `None` keeps the level-derived tag.
52    strategy_override: Option<crate::encoding::strategy::StrategyTag>,
53    /// `ZSTD_f_zstd1_magicless` — omit the 4-byte magic number prefix.
54    /// Default false. See [`Self::set_magicless`].
55    magicless: bool,
56    /// Whether to emit a trailing XXH64 content checksum and set the frame
57    /// header's `Content_Checksum_flag` (upstream `ZSTD_c_checksumFlag`).
58    /// Default `false`, matching the upstream library default; combined with
59    /// the `hash` feature, so without `hash` no checksum is emitted
60    /// regardless. See [`Self::set_content_checksum`].
61    content_checksum: bool,
62    /// Dictionary applied to the frame (upstream zstd `ZSTD_CCtx_loadDictionary` on a
63    /// streaming context). `None` = no dictionary. Set before the first write.
64    dictionary: Option<EncoderDictionary>,
65    /// Whether the frame header records the attached dictionary's ID
66    /// (upstream `ZSTD_c_dictIDFlag`). Default `true`. Raw-content
67    /// dictionaries (upstream `ZSTD_CCtx_refPrefix`) carry a synthetic
68    /// non-zero ID that must not reach the wire, so their attach path
69    /// turns this off. See [`Self::set_dictionary_id_flag`].
70    dictionary_id_flag: bool,
71    /// Encoder entropy tables (literals Huffman + LL/ML/OF FSE "previous"
72    /// tables) the dictionary seeds into the first block, derived once when the
73    /// dictionary is attached so each frame start is a cheap clone.
74    dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
75    #[cfg(feature = "hash")]
76    hasher: XxHash64,
77}
78
79impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
80    /// Creates a streaming encoder backed by the default match generator.
81    ///
82    /// The encoder writes compressed bytes into `drain` and applies `compression_level`
83    /// to all subsequently written blocks.
84    pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
85        Self::new_with_matcher(
86            MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
87            drain,
88            compression_level,
89        )
90    }
91
92    /// Configure fine-grained compression parameters (#27): resets the level to
93    /// the parameters' level and installs the per-knob overrides (window / hash
94    /// / chain / search logs, strategy, long-distance matching) applied at the
95    /// next frame. Mirrors [`FrameCompressor::set_parameters`]. Must be called
96    /// before the first [`write`](Write::write). Only the built-in
97    /// `MatchGeneratorDriver` exposes the override knobs, so this lives on the
98    /// default-matcher impl.
99    pub fn set_parameters(
100        &mut self,
101        params: &crate::encoding::CompressionParameters,
102    ) -> Result<(), Error> {
103        self.ensure_open()?;
104        if self.frame_started {
105            return Err(invalid_input_error(
106                "compression parameters must be set before the first write",
107            ));
108        }
109        self.compression_level = params.level();
110        let overrides = params.overrides();
111        // Persist the strategy override so `ensure_frame_started`'s level-based
112        // resync does not discard it (matching `FrameCompressor::set_parameters`).
113        self.strategy_override = overrides.strategy.map(|s| s.tag());
114        self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
115            crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
116        });
117        self.state.matcher.set_param_overrides(Some(overrides));
118        Ok(())
119    }
120}
121
122impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
123    /// Creates a streaming encoder with an explicitly provided matcher implementation.
124    ///
125    /// This constructor is primarily intended for tests and advanced callers that need
126    /// custom match-window behavior.
127    pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
128        Self {
129            drain: Some(drain),
130            compression_level,
131            state: CompressState {
132                matcher,
133                last_huff_table: None,
134                huff_table_spare: None,
135                fse_tables: FseTables::new(),
136                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
137                offset_hist: [1, 4, 8],
138                strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
139                    compression_level,
140                ),
141            },
142            pending: Vec::new(),
143            encoded_scratch: Vec::new(),
144            errored: false,
145            last_error_kind: None,
146            last_error_message: None,
147            frame_started: false,
148            target_block_size: None,
149            pledged_content_size: None,
150            content_size_flag: true,
151            bytes_consumed: 0,
152            strategy_override: None,
153            magicless: false,
154            content_checksum: false,
155            dictionary: None,
156            dictionary_id_flag: true,
157            dictionary_entropy_cache: None,
158            #[cfg(feature = "hash")]
159            hasher: XxHash64::with_seed(0),
160        }
161    }
162
163    /// Set an upper bound on each physical block's payload (semantics of
164    /// upstream `ZSTD_c_targetCBlockSize`): every block carries at most
165    /// `target` payload bytes, +3-byte block header on the wire — the
166    /// upstream knob is likewise a convergence target for block sizing,
167    /// not a cap on header-inclusive wire bytes. Clamped to
168    /// `[MIN_TARGET_BLOCK_SIZE, MAX_BLOCK_SIZE]`; mirrors
169    /// `FrameCompressor::set_target_block_size`. Must be set before the
170    /// first write.
171    pub fn set_target_block_size(&mut self, target: Option<u32>) -> Result<(), Error> {
172        self.ensure_open()?;
173        if self.frame_started {
174            return Err(invalid_input_error(
175                "the block-size target must be set before the first write",
176            ));
177        }
178        self.target_block_size = target.map(|t| {
179            t.clamp(
180                crate::common::MIN_TARGET_BLOCK_SIZE,
181                crate::common::MAX_BLOCK_SIZE,
182            )
183        });
184        Ok(())
185    }
186
187    /// Enable or disable the trailing XXH64 content checksum
188    /// (upstream `ZSTD_c_checksumFlag`). Default `false`, matching the
189    /// upstream library default (`ZSTD_c_checksumFlag = 0`). Must be called
190    /// before the first [`write`](Write::write); once the frame header is
191    /// emitted the flag is fixed, so a late change returns an error rather
192    /// than producing a header/trailer mismatch. Without the `hash` feature
193    /// no checksum is emitted regardless.
194    pub fn set_content_checksum(&mut self, emit: bool) -> Result<(), Error> {
195        self.ensure_open()?;
196        if self.frame_started {
197            return Err(invalid_input_error(
198                "content checksum must be set before the first write",
199            ));
200        }
201        self.content_checksum = emit;
202        Ok(())
203    }
204
205    /// Enable or disable magicless frame format (`ZSTD_f_zstd1_magicless`).
206    ///
207    /// When set to `true`, the frame header serialized by this encoder
208    /// omits the 4-byte magic number prefix. Must be called BEFORE the
209    /// first [`write`](Write::write) call; calling it after the frame
210    /// header has already been emitted returns an error so the caller
211    /// can't be misled into thinking they produced a magicless stream.
212    pub fn set_magicless(&mut self, magicless: bool) -> Result<(), Error> {
213        self.ensure_open()?;
214        if self.frame_started {
215            return Err(invalid_input_error(
216                "magicless format must be set before the first write",
217            ));
218        }
219        self.magicless = magicless;
220        Ok(())
221    }
222
223    /// Pledge the total uncompressed content size for this frame.
224    ///
225    /// When set, the frame header will include a `Frame_Content_Size` field.
226    /// This enables decoders to pre-allocate output buffers.
227    /// The pledged size is also forwarded as a source-size hint to the
228    /// matcher so small inputs can use smaller matching tables.
229    ///
230    /// Must be called **before** the first [`write`](Write::write) call;
231    /// calling it after the frame header has already been emitted returns an
232    /// error.
233    pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
234        self.ensure_open()?;
235        if self.frame_started {
236            return Err(invalid_input_error(
237                "pledged content size must be set before the first write",
238            ));
239        }
240        self.pledged_content_size = Some(size);
241        // Also use pledged size as source-size hint so the matcher
242        // can select smaller tables for small inputs.
243        self.state.matcher.set_source_size_hint(size);
244        Ok(())
245    }
246
247    /// Control whether the pledged size is written into the header's
248    /// `Frame_Content_Size` field (upstream `ZSTD_c_contentSizeFlag`,
249    /// default on). With the flag off the header omits the field, but a
250    /// pledge set via [`set_pledged_content_size`](Self::set_pledged_content_size)
251    /// is still enforced against the bytes actually written. Must be
252    /// called before the first [`write`](Write::write).
253    pub fn set_content_size_flag(&mut self, emit: bool) -> Result<(), Error> {
254        self.ensure_open()?;
255        if self.frame_started {
256            return Err(invalid_input_error(
257                "content size flag must be set before the first write",
258            ));
259        }
260        self.content_size_flag = emit;
261        Ok(())
262    }
263
264    /// Provide a hint about the total uncompressed size for the next frame.
265    ///
266    /// Unlike [`set_pledged_content_size`](Self::set_pledged_content_size),
267    /// this does **not** enforce that exactly `size` bytes are written; it
268    /// may reduce matcher tables, advertised frame window, and block sizing
269    /// for small inputs. Must be called before the first
270    /// [`write`](Write::write).
271    pub fn set_source_size_hint(&mut self, size: u64) -> Result<(), Error> {
272        self.ensure_open()?;
273        if self.frame_started {
274            return Err(invalid_input_error(
275                "source size hint must be set before the first write",
276            ));
277        }
278        self.state.matcher.set_source_size_hint(size);
279        Ok(())
280    }
281
282    /// Attach a serialized dictionary blob to the frame (upstream zstd
283    /// `ZSTD_CCtx_loadDictionary` on a streaming context). The dictionary primes
284    /// the match-finder and seeds the first block's entropy tables + repeat
285    /// offsets, and its ID is written into the frame header. Must be called
286    /// before the first [`write`](Write::write); the parsed dictionary must have
287    /// a non-zero ID and non-zero repeat offsets.
288    pub fn set_dictionary_from_bytes(&mut self, raw_dictionary: &[u8]) -> Result<(), Error> {
289        let dict = EncoderDictionary::from_bytes(raw_dictionary)
290            .map_err(|err| invalid_input_error(&alloc::format!("invalid dictionary: {err:?}")))?;
291        self.set_encoder_dictionary(dict)
292    }
293
294    /// Whether the frame header records the dictionary ID when a dictionary
295    /// is attached (upstream `ZSTD_c_dictIDFlag` semantics; default `true`).
296    /// Mirrors [`FrameCompressor::set_dictionary_id_flag`]. Decoders can still
297    /// decode such frames by supplying the dictionary explicitly.
298    pub fn set_dictionary_id_flag(&mut self, emit: bool) -> Result<(), Error> {
299        self.ensure_open()?;
300        if self.frame_started {
301            return Err(invalid_input_error(
302                "dictionary ID flag must be set before the first write",
303            ));
304        }
305        self.dictionary_id_flag = emit;
306        Ok(())
307    }
308
309    /// Attach an already-parsed [`EncoderDictionary`] to the frame. See
310    /// [`set_dictionary_from_bytes`](Self::set_dictionary_from_bytes); must be
311    /// called before the first write.
312    pub fn set_encoder_dictionary(&mut self, dict: EncoderDictionary) -> Result<(), Error> {
313        self.ensure_open()?;
314        if self.frame_started {
315            return Err(invalid_input_error(
316                "dictionary must be attached before the first write",
317            ));
318        }
319        let inner = &dict.inner;
320        if inner.id == 0 {
321            return Err(invalid_input_error("dictionary has a zero ID"));
322        }
323        if inner.offset_hist.contains(&0) {
324            return Err(invalid_input_error(
325                "dictionary carries a zero repeat offset",
326            ));
327        }
328        self.dictionary_entropy_cache = Some(CachedDictionaryEntropy::from_dictionary(inner));
329        self.dictionary = Some(dict);
330        Ok(())
331    }
332
333    /// Returns an immutable reference to the wrapped output drain.
334    ///
335    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
336    /// consumes the encoder and returns ownership of the drain.
337    pub fn get_ref(&self) -> &W {
338        self.drain
339            .as_ref()
340            .expect("streaming encoder drain is present until finish consumes self")
341    }
342
343    /// Total heap bytes this encoder's allocations hold, excluding the
344    /// inline struct and the drain `W` (whose footprint the owner can
345    /// measure through [`get_ref`](Self::get_ref)): match-finder tables /
346    /// history / recycled buffers, retained Huffman tables, the staging
347    /// `pending` / `encoded_scratch` buffers, the retained dictionary
348    /// content, and the cached dictionary entropy tables. Mirrors
349    /// `FrameCompressor::heap_size` so a context can report its true
350    /// footprint through `ZSTD_sizeof_CCtx`.
351    pub fn heap_size(&self) -> usize {
352        let mut total = self.state.matcher.heap_size();
353        total += self
354            .state
355            .last_huff_table
356            .as_ref()
357            .map_or(0, |table| table.heap_size());
358        total += self
359            .state
360            .huff_table_spare
361            .as_ref()
362            .map_or(0, |table| table.heap_size());
363        total += self.pending.capacity();
364        total += self.encoded_scratch.capacity();
365        total += self
366            .dictionary
367            .as_ref()
368            .map_or(0, |d| d.inner.dict_content.capacity());
369        total += self
370            .dictionary_entropy_cache
371            .as_ref()
372            .map_or(0, CachedDictionaryEntropy::heap_size);
373        total
374    }
375
376    /// Returns a mutable reference to the wrapped output drain.
377    ///
378    /// It is inadvisable to directly write to the underlying writer, as doing
379    /// so would corrupt the zstd frame being assembled by the encoder.
380    ///
381    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
382    /// consumes the encoder and returns ownership of the drain.
383    pub fn get_mut(&mut self) -> &mut W {
384        self.drain
385            .as_mut()
386            .expect("streaming encoder drain is present until finish consumes self")
387    }
388
389    /// Finalizes the current zstd frame and returns the wrapped output drain.
390    ///
391    /// If no payload was written yet, this still emits a valid empty frame.
392    /// Calling this method consumes the encoder.
393    pub fn finish(mut self) -> Result<W, Error> {
394        self.ensure_open()?;
395
396        // Validate the pledge before finalizing the frame. If finish() is
397        // called before any writes, this also avoids emitting a header with
398        // an incorrect FCS into the drain on mismatch.
399        if let Some(pledged) = self.pledged_content_size
400            && self.bytes_consumed != pledged
401        {
402            return Err(invalid_input_error(
403                "pledged content size does not match bytes consumed",
404            ));
405        }
406
407        self.ensure_frame_started()?;
408
409        if self.pending.is_empty() {
410            self.write_empty_last_block()
411                .map_err(|err| self.fail(err))?;
412        } else {
413            self.emit_pending_block(true)?;
414        }
415
416        let mut drain = self
417            .drain
418            .take()
419            .expect("streaming encoder drain must be present when finishing");
420
421        #[cfg(feature = "hash")]
422        if self.content_checksum {
423            let checksum = self.hasher.finish() as u32;
424            drain
425                .write_all(&checksum.to_le_bytes())
426                .map_err(|err| self.fail(err))?;
427        }
428
429        drain.flush().map_err(|err| self.fail(err))?;
430        Ok(drain)
431    }
432
433    fn ensure_open(&self) -> Result<(), Error> {
434        if self.errored {
435            return Err(self.sticky_error());
436        }
437        Ok(())
438    }
439
440    // Cold path (only reached after poisoning). The format!() calls still allocate
441    // in no_std even though error_with_kind_message/other_error_owned drop the
442    // message; this is acceptable on an error recovery path to keep match arms simple.
443    fn sticky_error(&self) -> Error {
444        match (self.last_error_kind, self.last_error_message.as_deref()) {
445            (Some(kind), Some(message)) => error_with_kind_message(
446                kind,
447                format!(
448                    "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
449                ),
450            ),
451            (Some(kind), None) => error_from_kind(kind),
452            (None, Some(message)) => other_error_owned(format!(
453                "streaming encoder is in an errored state: {message}"
454            )),
455            (None, None) => other_error("streaming encoder is in an errored state"),
456        }
457    }
458
459    fn drain_mut(&mut self) -> Result<&mut W, Error> {
460        self.drain
461            .as_mut()
462            .ok_or_else(|| other_error("streaming encoder has no active drain"))
463    }
464
465    fn ensure_frame_started(&mut self) -> Result<(), Error> {
466        if self.frame_started {
467            return Ok(());
468        }
469
470        self.ensure_level_supported()?;
471        // A dictionary is only active when it can actually be primed: the level
472        // compresses (not `Uncompressed`) AND the matcher supports priming AND a
473        // dictionary is attached. Mirrors `FrameCompressor`'s `use_dictionary_state`
474        // so a streaming frame never advertises a `Dictionary_ID`, disables
475        // single-segment, or seeds dict entropy/offsets unless the dictionary is
476        // genuinely in play (otherwise it would emit frames that needlessly
477        // require a dictionary at decode time).
478        let use_dictionary_state =
479            !matches!(self.compression_level, CompressionLevel::Uncompressed)
480                && self.state.matcher.supports_dictionary_priming()
481                && self.dictionary.is_some();
482        // The dictionary content size drives dict-tier match-finder sizing
483        // (consumed inside `reset`), so hand it over BEFORE reset.
484        if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
485            self.state
486                .matcher
487                .set_dictionary_size_hint(dict.inner.dict_content.len());
488        }
489        self.state.matcher.reset(self.compression_level);
490        // Seed the repeat-offset history from the dictionary (upstream zstd
491        // `ZSTD_compress_insertDictionary`), or the default rep codes otherwise.
492        self.state.offset_hist = if use_dictionary_state {
493            self.dictionary
494                .as_ref()
495                .map(|dict| dict.inner.offset_hist)
496                .unwrap_or([1, 4, 8])
497        } else {
498            [1, 4, 8]
499        };
500        // Prime the match-finder with the dictionary content + offsets.
501        // `dict` borrows `self.dictionary`; `self.state.matcher` is a disjoint
502        // field, so the immutable dict borrow and the mutable matcher borrow
503        // coexist (field-level borrow splitting) with no conflict.
504        if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
505            let offset_hist = dict.inner.offset_hist;
506            self.state
507                .matcher
508                .prime_with_dictionary(dict.inner.dict_content.as_slice(), offset_hist);
509        }
510        // Seed the first block's entropy from the dictionary's cached encoder
511        // tables (upstream zstd `cdict->cBlockState`), or clear to defaults.
512        if use_dictionary_state && let Some(cache) = self.dictionary_entropy_cache.as_ref() {
513            self.state.last_huff_table.clone_from(&cache.huff);
514            self.state
515                .fse_tables
516                .ll_previous
517                .clone_from(&cache.ll_previous);
518            self.state
519                .fse_tables
520                .ml_previous
521                .clone_from(&cache.ml_previous);
522            self.state
523                .fse_tables
524                .of_previous
525                .clone_from(&cache.of_previous);
526            let ll_entropy = match cache.ll_previous.as_ref() {
527                Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
528                _ => None,
529            };
530            let ml_entropy = match cache.ml_previous.as_ref() {
531                Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
532                _ => None,
533            };
534            let of_entropy = match cache.of_previous.as_ref() {
535                Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
536                _ => None,
537            };
538            self.state.matcher.seed_dictionary_entropy(
539                self.state.last_huff_table.as_ref(),
540                ll_entropy,
541                ml_entropy,
542                of_entropy,
543            );
544        } else {
545            self.state.last_huff_table = None;
546            self.state.fse_tables.ll_previous = None;
547            self.state.fse_tables.ml_previous = None;
548            self.state.fse_tables.of_previous = None;
549        }
550        // Sync `state.strategy_tag` from the active compression level so the
551        // literal-compression gates (`min_literals_to_compress`, `min_gain`
552        // in `encoding::blocks::compressed`) see the correct strategy for
553        // every frame. Mirrors `FrameCompressor::compress` and keeps both
554        // entry points byte-equivalent at the gate level. A public-parameter
555        // strategy override (#27) wins over the level's derived tag so the
556        // gates see the strategy the matcher actually runs.
557        self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
558            crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
559        });
560        #[cfg(feature = "hash")]
561        {
562            self.hasher = XxHash64::with_seed(0);
563        }
564
565        let window_size = self.state.matcher.window_size();
566        if window_size == 0 {
567            return Err(invalid_input_error(
568                "matcher reported window_size == 0, which is invalid",
569            ));
570        }
571
572        // Single-segment is incompatible with a dictionary (the dictionary
573        // pushes referenceable history before the content, so the frame needs
574        // an explicit window descriptor); gate it off when a dict is attached,
575        // mirroring `FrameCompressor`'s `!use_dictionary_state` guard.
576        // Single-segment also requires the FCS field to be present
577        // (`content_size_flag`): the layout drops the window descriptor,
578        // so the header must carry the content size for decoders to size
579        // their window.
580        let single_segment = self.content_size_flag
581            && !use_dictionary_state
582            && self
583                .pledged_content_size
584                .map(|size| (512..=(1 << 14)).contains(&size) && size <= window_size)
585                .unwrap_or(false);
586
587        let header = FrameHeader {
588            frame_content_size: if self.content_size_flag {
589                self.pledged_content_size
590            } else {
591                None
592            },
593            single_segment,
594            content_checksum: cfg!(feature = "hash") && self.content_checksum,
595            dictionary_id: if use_dictionary_state && self.dictionary_id_flag {
596                self.dictionary.as_ref().map(|dict| dict.inner.id as u64)
597            } else {
598                None
599            },
600            window_size: if single_segment {
601                None
602            } else {
603                Some(window_size)
604            },
605            magicless: self.magicless,
606        };
607        let mut encoded_header = Vec::new();
608        header.serialize(&mut encoded_header);
609        self.drain_mut()
610            .and_then(|drain| drain.write_all(&encoded_header))
611            .map_err(|err| self.fail(err))?;
612
613        self.frame_started = true;
614        Ok(())
615    }
616
617    fn block_capacity(&self) -> usize {
618        let matcher_window = self.state.matcher.window_size() as usize;
619        let ceiling = self
620            .target_block_size
621            .map_or(MAX_BLOCK_SIZE as usize, |t| t as usize);
622        core::cmp::max(1, core::cmp::min(matcher_window, ceiling))
623    }
624
625    fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
626        let mut space = match self.compression_level {
627            CompressionLevel::Fastest
628            | CompressionLevel::Default
629            | CompressionLevel::Better
630            | CompressionLevel::Best
631            | CompressionLevel::Level(_) => self.state.matcher.get_next_space(),
632            CompressionLevel::Uncompressed => Vec::new(),
633        };
634        space.clear();
635        if space.capacity() > block_capacity {
636            space.shrink_to(block_capacity);
637        }
638        if space.capacity() < block_capacity {
639            space.reserve(block_capacity - space.capacity());
640        }
641        space
642    }
643
644    fn emit_full_pending_block(
645        &mut self,
646        block_capacity: usize,
647        consumed: usize,
648    ) -> Option<Result<usize, Error>> {
649        if self.pending.len() != block_capacity {
650            return None;
651        }
652
653        let new_pending = self.allocate_pending_space(block_capacity);
654        let full_block = mem::replace(&mut self.pending, new_pending);
655        if let Err((err, restored_block)) = self.encode_block(full_block, false) {
656            self.pending = restored_block;
657            let err = self.fail(err);
658            if consumed > 0 {
659                return Some(Ok(consumed));
660            }
661            return Some(Err(err));
662        }
663        None
664    }
665
666    fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
667        let block = mem::take(&mut self.pending);
668        if let Err((err, restored_block)) = self.encode_block(block, last_block) {
669            self.pending = restored_block;
670            return Err(self.fail(err));
671        }
672        if !last_block {
673            let block_capacity = self.block_capacity();
674            self.pending = self.allocate_pending_space(block_capacity);
675        }
676        Ok(())
677    }
678
679    // Exhaustive match kept intentionally: adding a new CompressionLevel
680    // variant will produce a compile error here, forcing the developer to
681    // decide whether the streaming encoder supports it before shipping.
682    fn ensure_level_supported(&self) -> Result<(), Error> {
683        match self.compression_level {
684            CompressionLevel::Uncompressed
685            | CompressionLevel::Fastest
686            | CompressionLevel::Default
687            | CompressionLevel::Better
688            | CompressionLevel::Best
689            | CompressionLevel::Level(_) => Ok(()),
690        }
691    }
692
693    fn encode_block(
694        &mut self,
695        uncompressed_data: Vec<u8>,
696        last_block: bool,
697    ) -> Result<(), (Error, Vec<u8>)> {
698        let mut raw_block = Some(uncompressed_data);
699        let mut encoded = Vec::new();
700        mem::swap(&mut encoded, &mut self.encoded_scratch);
701        encoded.clear();
702        let needed_capacity = self.block_capacity() + 3;
703        if encoded.capacity() < needed_capacity {
704            encoded.reserve(needed_capacity.saturating_sub(encoded.len()));
705        }
706        let mut moved_into_matcher = false;
707        if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
708            let header = BlockHeader {
709                last_block,
710                block_type: crate::blocks::block::BlockType::Raw,
711                block_size: 0,
712            };
713            header.serialize(&mut encoded);
714        } else {
715            match self.compression_level {
716                CompressionLevel::Uncompressed => {
717                    let block = raw_block.as_ref().expect("raw block missing");
718                    let header = BlockHeader {
719                        last_block,
720                        block_type: crate::blocks::block::BlockType::Raw,
721                        block_size: block.len() as u32,
722                    };
723                    header.serialize(&mut encoded);
724                    encoded.extend_from_slice(block);
725                }
726                CompressionLevel::Fastest
727                | CompressionLevel::Default
728                | CompressionLevel::Better
729                | CompressionLevel::Best
730                | CompressionLevel::Level(_) => {
731                    let block = raw_block.take().expect("raw block missing");
732                    debug_assert!(!block.is_empty(), "empty blocks handled above");
733                    // A primed dictionary makes "incompressible-looking" blocks
734                    // matchable, so the raw-fast-path must NOT fire. But a dict is
735                    // only PRIMED when the matcher supports priming — a non-priming
736                    // matcher ignores the attached dictionary, so the raw-fast-path
737                    // must stay enabled for it. (This arm is already non-Uncompressed.)
738                    // Mirrors `FrameCompressor`'s `dict_active`.
739                    let dict_active = self.dictionary.is_some()
740                        && self.state.matcher.supports_dictionary_priming();
741                    compress_block_encoded(
742                        &mut self.state,
743                        self.compression_level,
744                        last_block,
745                        block,
746                        &mut encoded,
747                        dict_active,
748                        // No FrameEmitInfo on the streaming encoder path — it
749                        // does not surface per-block layout, so no sidecar.
750                        #[cfg(feature = "lsm")]
751                        None,
752                        #[cfg(all(feature = "lsm", feature = "hash"))]
753                        None,
754                    );
755                    moved_into_matcher = true;
756                }
757            }
758        }
759
760        if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
761            encoded.clear();
762            mem::swap(&mut encoded, &mut self.encoded_scratch);
763            let restored = if moved_into_matcher {
764                self.state.matcher.get_last_space().to_vec()
765            } else {
766                raw_block.unwrap_or_default()
767            };
768            return Err((err, restored));
769        }
770
771        if moved_into_matcher {
772            #[cfg(feature = "hash")]
773            if self.content_checksum {
774                self.hasher.write(self.state.matcher.get_last_space());
775            }
776        } else {
777            self.hash_block(raw_block.as_deref().unwrap_or(&[]));
778        }
779        encoded.clear();
780        mem::swap(&mut encoded, &mut self.encoded_scratch);
781        Ok(())
782    }
783
784    fn write_empty_last_block(&mut self) -> Result<(), Error> {
785        self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
786    }
787
788    fn fail(&mut self, err: Error) -> Error {
789        self.errored = true;
790        if self.last_error_kind.is_none() {
791            self.last_error_kind = Some(err.kind());
792        }
793        if self.last_error_message.is_none() {
794            self.last_error_message = Some(err.to_string());
795        }
796        err
797    }
798
799    #[cfg(feature = "hash")]
800    fn hash_block(&mut self, uncompressed_data: &[u8]) {
801        if self.content_checksum {
802            self.hasher.write(uncompressed_data);
803        }
804    }
805
806    #[cfg(not(feature = "hash"))]
807    fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
808}
809
810impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
811    fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
812        self.ensure_open()?;
813        if buf.is_empty() {
814            return Ok(0);
815        }
816
817        // Check pledge before emitting the frame header so that a misuse
818        // like set_pledged_content_size(0) + write(non_empty) doesn't leave
819        // a partially-written header in the drain.
820        if let Some(pledged) = self.pledged_content_size
821            && self.bytes_consumed >= pledged
822        {
823            return Err(invalid_input_error(
824                "write would exceed pledged content size",
825            ));
826        }
827
828        self.ensure_frame_started()?;
829
830        // Enforce pledged upper bound: truncate the accepted slice to the
831        // remaining allowance so that partial-write semantics are honored
832        // (return Ok(n) with n < buf.len()) instead of failing the full call.
833        let buf = if let Some(pledged) = self.pledged_content_size {
834            let remaining_allowed = pledged
835                .checked_sub(self.bytes_consumed)
836                .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
837            if remaining_allowed == 0 {
838                return Err(invalid_input_error(
839                    "write would exceed pledged content size",
840                ));
841            }
842            let accepted = core::cmp::min(
843                buf.len(),
844                usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
845            );
846            &buf[..accepted]
847        } else {
848            buf
849        };
850
851        let block_capacity = self.block_capacity();
852        if self.pending.capacity() == 0 {
853            self.pending = self.allocate_pending_space(block_capacity);
854        }
855        let mut remaining = buf;
856        let mut consumed = 0usize;
857
858        while !remaining.is_empty() {
859            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
860                return result;
861            }
862
863            let available = block_capacity - self.pending.len();
864            let to_take = core::cmp::min(remaining.len(), available);
865            if to_take == 0 {
866                break;
867            }
868            self.pending.extend_from_slice(&remaining[..to_take]);
869            remaining = &remaining[to_take..];
870            consumed += to_take;
871
872            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
873                if let Ok(n) = &result {
874                    self.bytes_consumed += *n as u64;
875                }
876                return result;
877            }
878        }
879        self.bytes_consumed += consumed as u64;
880        Ok(consumed)
881    }
882
883    fn flush(&mut self) -> Result<(), Error> {
884        self.ensure_open()?;
885        if self.pending.is_empty() {
886            return self
887                .drain_mut()
888                .and_then(|drain| drain.flush())
889                .map_err(|err| self.fail(err));
890        }
891        self.ensure_frame_started()?;
892        self.emit_pending_block(false)?;
893        self.drain_mut()
894            .and_then(|drain| drain.flush())
895            .map_err(|err| self.fail(err))
896    }
897}
898
899fn error_from_kind(kind: ErrorKind) -> Error {
900    Error::from(kind)
901}
902
903fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
904    #[cfg(feature = "std")]
905    {
906        Error::new(kind, message)
907    }
908    #[cfg(not(feature = "std"))]
909    {
910        Error::new(kind, alloc::boxed::Box::new(message))
911    }
912}
913
914fn invalid_input_error(message: &str) -> Error {
915    #[cfg(feature = "std")]
916    {
917        Error::new(ErrorKind::InvalidInput, message)
918    }
919    #[cfg(not(feature = "std"))]
920    {
921        Error::new(
922            ErrorKind::Other,
923            alloc::boxed::Box::new(alloc::string::String::from(message)),
924        )
925    }
926}
927
928fn other_error_owned(message: String) -> Error {
929    #[cfg(feature = "std")]
930    {
931        Error::other(message)
932    }
933    #[cfg(not(feature = "std"))]
934    {
935        Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
936    }
937}
938
939fn other_error(message: &str) -> Error {
940    #[cfg(feature = "std")]
941    {
942        Error::other(message)
943    }
944    #[cfg(not(feature = "std"))]
945    {
946        Error::new(
947            ErrorKind::Other,
948            alloc::boxed::Box::new(alloc::string::String::from(message)),
949        )
950    }
951}
952
953#[cfg(test)]
954mod tests {
955    use crate::decoding::StreamingDecoder;
956    use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
957    use crate::io::{Error, ErrorKind, Read, Write};
958    use alloc::vec;
959    use alloc::vec::Vec;
960
961    struct TinyMatcher {
962        last_space: Vec<u8>,
963        window_size: u64,
964    }
965
966    impl TinyMatcher {
967        fn new(window_size: u64) -> Self {
968            Self {
969                last_space: Vec::new(),
970                window_size,
971            }
972        }
973    }
974
975    impl Matcher for TinyMatcher {
976        fn get_next_space(&mut self) -> Vec<u8> {
977            vec![0; self.window_size as usize]
978        }
979
980        fn get_last_space(&mut self) -> &[u8] {
981            self.last_space.as_slice()
982        }
983
984        fn commit_space(&mut self, space: Vec<u8>) {
985            self.last_space = space;
986        }
987
988        fn skip_matching(&mut self) {}
989
990        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
991            handle_sequence(Sequence::Literals {
992                literals: self.last_space.as_slice(),
993            });
994        }
995
996        fn reset(&mut self, _level: CompressionLevel) {
997            self.last_space.clear();
998        }
999
1000        fn window_size(&self) -> u64 {
1001            self.window_size
1002        }
1003    }
1004
1005    struct FailingWriteOnce {
1006        writes: usize,
1007        fail_on_write_number: usize,
1008        sink: Vec<u8>,
1009    }
1010
1011    impl FailingWriteOnce {
1012        fn new(fail_on_write_number: usize) -> Self {
1013            Self {
1014                writes: 0,
1015                fail_on_write_number,
1016                sink: Vec::new(),
1017            }
1018        }
1019    }
1020
1021    impl Write for FailingWriteOnce {
1022        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
1023            self.writes += 1;
1024            if self.writes == self.fail_on_write_number {
1025                return Err(super::other_error("injected write failure"));
1026            }
1027            self.sink.extend_from_slice(buf);
1028            Ok(buf.len())
1029        }
1030
1031        fn flush(&mut self) -> Result<(), Error> {
1032            Ok(())
1033        }
1034    }
1035
1036    struct FailingWithKind {
1037        writes: usize,
1038        fail_on_write_number: usize,
1039        kind: ErrorKind,
1040    }
1041
1042    impl FailingWithKind {
1043        fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
1044            Self {
1045                writes: 0,
1046                fail_on_write_number,
1047                kind,
1048            }
1049        }
1050    }
1051
1052    impl Write for FailingWithKind {
1053        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
1054            self.writes += 1;
1055            if self.writes == self.fail_on_write_number {
1056                return Err(Error::from(self.kind));
1057            }
1058            Ok(buf.len())
1059        }
1060
1061        fn flush(&mut self) -> Result<(), Error> {
1062            Ok(())
1063        }
1064    }
1065
1066    struct PartialThenFailWriter {
1067        writes: usize,
1068        fail_on_write_number: usize,
1069        partial_prefix_len: usize,
1070        terminal_failure: bool,
1071        sink: Vec<u8>,
1072    }
1073
1074    impl PartialThenFailWriter {
1075        fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
1076            Self {
1077                writes: 0,
1078                fail_on_write_number,
1079                partial_prefix_len,
1080                terminal_failure: false,
1081                sink: Vec::new(),
1082            }
1083        }
1084    }
1085
1086    impl Write for PartialThenFailWriter {
1087        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
1088            if self.terminal_failure {
1089                return Err(super::other_error("injected terminal write failure"));
1090            }
1091
1092            self.writes += 1;
1093            if self.writes == self.fail_on_write_number {
1094                let written = core::cmp::min(self.partial_prefix_len, buf.len());
1095                if written > 0 {
1096                    self.sink.extend_from_slice(&buf[..written]);
1097                    self.terminal_failure = true;
1098                    return Ok(written);
1099                }
1100                return Err(super::other_error("injected terminal write failure"));
1101            }
1102
1103            self.sink.extend_from_slice(buf);
1104            Ok(buf.len())
1105        }
1106
1107        fn flush(&mut self) -> Result<(), Error> {
1108            Ok(())
1109        }
1110    }
1111
1112    /// Pre-write `set_magicless(true)` → emitted frame omits the
1113    /// magic prefix AND round-trips through a magicless-aware
1114    /// decoder.
1115    #[test]
1116    fn streaming_encoder_set_magicless_before_write_omits_magic_and_roundtrips() {
1117        use crate::common::MAGIC_NUM;
1118        let payload = b"streaming-magicless-roundtrip-".repeat(64);
1119
1120        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1121        encoder
1122            .set_magicless(true)
1123            .expect("set_magicless pre-write");
1124        encoder.write_all(&payload).unwrap();
1125        let compressed = encoder.finish().unwrap();
1126
1127        assert!(
1128            !compressed.starts_with(&MAGIC_NUM.to_le_bytes()),
1129            "magicless frame must omit the 4-byte magic prefix",
1130        );
1131
1132        let mut decoder = crate::decoding::FrameDecoder::new();
1133        decoder.set_magicless(true);
1134        let mut cursor: &[u8] = compressed.as_slice();
1135        decoder.init(&mut cursor).expect("magicless init");
1136        decoder
1137            .decode_blocks(&mut cursor, crate::decoding::BlockDecodingStrategy::All)
1138            .expect("decode_blocks");
1139        let mut decoded: Vec<u8> = Vec::new();
1140        decoder
1141            .collect_to_writer(&mut decoded)
1142            .expect("collect_to_writer");
1143        assert_eq!(decoded, payload);
1144    }
1145
1146    /// `set_magicless` after the first write MUST return an error
1147    /// (the frame header has already been emitted, flipping the flag
1148    /// can't affect the current frame). Mirrors
1149    /// `set_pledged_content_size` / `set_source_size_hint` semantics.
1150    #[test]
1151    fn streaming_encoder_set_magicless_after_first_write_errors() {
1152        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1153        encoder.write_all(b"first-block").unwrap();
1154        let err = encoder
1155            .set_magicless(true)
1156            .expect_err("set_magicless after first write must error");
1157        assert_eq!(
1158            err.kind(),
1159            crate::io::ErrorKind::InvalidInput,
1160            "expected InvalidInput when setting magicless after frame_started, got {err:?}",
1161        );
1162    }
1163
1164    #[test]
1165    fn streaming_encoder_roundtrip_multiple_writes() {
1166        let payload = b"streaming-encoder-roundtrip-".repeat(1024);
1167        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1168        for chunk in payload.chunks(313) {
1169            encoder.write_all(chunk).unwrap();
1170        }
1171        let compressed = encoder.finish().unwrap();
1172
1173        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1174        let mut decoded = Vec::new();
1175        decoder.read_to_end(&mut decoded).unwrap();
1176        assert_eq!(decoded, payload);
1177    }
1178
1179    #[test]
1180    fn flush_emits_nonempty_partial_output() {
1181        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1182        encoder.write_all(b"partial-block").unwrap();
1183        encoder.flush().unwrap();
1184        let flushed_len = encoder.get_ref().len();
1185        assert!(
1186            flushed_len > 0,
1187            "flush should emit header+partial block bytes"
1188        );
1189        let compressed = encoder.finish().unwrap();
1190        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1191        let mut decoded = Vec::new();
1192        decoder.read_to_end(&mut decoded).unwrap();
1193        assert_eq!(decoded, b"partial-block");
1194    }
1195
1196    #[test]
1197    fn flush_without_writes_does_not_emit_frame_header() {
1198        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1199        encoder.flush().unwrap();
1200        assert!(encoder.get_ref().is_empty());
1201    }
1202
1203    #[test]
1204    fn block_boundary_write_emits_block_in_same_call() {
1205        let mut boundary = StreamingEncoder::new_with_matcher(
1206            TinyMatcher::new(4),
1207            Vec::new(),
1208            CompressionLevel::Uncompressed,
1209        );
1210        let mut below = StreamingEncoder::new_with_matcher(
1211            TinyMatcher::new(4),
1212            Vec::new(),
1213            CompressionLevel::Uncompressed,
1214        );
1215
1216        boundary.write_all(b"ABCD").unwrap();
1217        below.write_all(b"ABC").unwrap();
1218
1219        let boundary_len = boundary.get_ref().len();
1220        let below_len = below.get_ref().len();
1221        assert!(
1222            boundary_len > below_len,
1223            "full block should be emitted immediately at block boundary"
1224        );
1225    }
1226
1227    #[test]
1228    fn finish_consumes_encoder_and_emits_frame() {
1229        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1230        encoder.write_all(b"abc").unwrap();
1231        let compressed = encoder.finish().unwrap();
1232        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1233        let mut decoded = Vec::new();
1234        decoder.read_to_end(&mut decoded).unwrap();
1235        assert_eq!(decoded, b"abc");
1236    }
1237
1238    #[test]
1239    fn finish_without_writes_emits_empty_frame() {
1240        let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1241        let compressed = encoder.finish().unwrap();
1242        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1243        let mut decoded = Vec::new();
1244        decoder.read_to_end(&mut decoded).unwrap();
1245        assert!(decoded.is_empty());
1246    }
1247
1248    #[test]
1249    fn write_empty_buffer_returns_zero() {
1250        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1251        assert_eq!(encoder.write(&[]).unwrap(), 0);
1252        let _ = encoder.finish().unwrap();
1253    }
1254
1255    #[test]
1256    fn uncompressed_level_roundtrip() {
1257        let payload = b"uncompressed-streaming-roundtrip".repeat(64);
1258        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
1259        for chunk in payload.chunks(41) {
1260            encoder.write_all(chunk).unwrap();
1261        }
1262        let compressed = encoder.finish().unwrap();
1263        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1264        let mut decoded = Vec::new();
1265        decoder.read_to_end(&mut decoded).unwrap();
1266        assert_eq!(decoded, payload);
1267    }
1268
1269    #[test]
1270    fn better_level_streaming_roundtrip() {
1271        let payload = b"better-level-streaming-test".repeat(256);
1272        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
1273        for chunk in payload.chunks(53) {
1274            encoder.write_all(chunk).unwrap();
1275        }
1276        let compressed = encoder.finish().unwrap();
1277        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1278        let mut decoded = Vec::new();
1279        decoder.read_to_end(&mut decoded).unwrap();
1280        assert_eq!(decoded, payload);
1281    }
1282
1283    #[test]
1284    fn zero_window_matcher_returns_invalid_input_error() {
1285        let mut encoder = StreamingEncoder::new_with_matcher(
1286            TinyMatcher::new(0),
1287            Vec::new(),
1288            CompressionLevel::Fastest,
1289        );
1290        let err = encoder.write_all(b"payload").unwrap_err();
1291        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1292    }
1293
1294    #[test]
1295    fn best_level_streaming_roundtrip() {
1296        // 200 KiB payload crosses the 128 KiB block boundary, exercising
1297        // multi-block emission and matcher state carry-over for Best.
1298        let payload = b"best-level-streaming-test".repeat(8 * 1024);
1299        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
1300        for chunk in payload.chunks(53) {
1301            encoder.write_all(chunk).unwrap();
1302        }
1303        let compressed = encoder.finish().unwrap();
1304        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1305        let mut decoded = Vec::new();
1306        decoder.read_to_end(&mut decoded).unwrap();
1307        assert_eq!(decoded, payload);
1308    }
1309
1310    #[test]
1311    fn write_failure_poisoning_is_sticky() {
1312        let mut encoder = StreamingEncoder::new_with_matcher(
1313            TinyMatcher::new(4),
1314            FailingWriteOnce::new(1),
1315            CompressionLevel::Uncompressed,
1316        );
1317
1318        assert!(encoder.write_all(b"ABCD").is_err());
1319        assert!(encoder.flush().is_err());
1320        assert!(encoder.write_all(b"EFGH").is_err());
1321        assert_eq!(encoder.get_ref().sink.len(), 0);
1322        assert!(encoder.finish().is_err());
1323    }
1324
1325    #[test]
1326    fn poisoned_encoder_returns_original_error_kind() {
1327        let mut encoder = StreamingEncoder::new_with_matcher(
1328            TinyMatcher::new(4),
1329            FailingWithKind::new(1, ErrorKind::BrokenPipe),
1330            CompressionLevel::Uncompressed,
1331        );
1332
1333        let first_error = encoder.write_all(b"ABCD").unwrap_err();
1334        assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
1335
1336        let second_error = encoder.write_all(b"EFGH").unwrap_err();
1337        assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
1338    }
1339
1340    #[test]
1341    fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
1342        let payload = b"ABCDEFGHIJKL";
1343        let mut encoder = StreamingEncoder::new_with_matcher(
1344            TinyMatcher::new(4),
1345            FailingWriteOnce::new(3),
1346            CompressionLevel::Uncompressed,
1347        );
1348
1349        let first_write = encoder.write(payload).unwrap();
1350        assert_eq!(first_write, 8);
1351        assert!(encoder.write(&payload[first_write..]).is_err());
1352        assert!(encoder.flush().is_err());
1353        assert!(encoder.write_all(b"EFGH").is_err());
1354    }
1355
1356    #[test]
1357    fn partial_write_failure_after_progress_poisons_encoder() {
1358        let payload = b"ABCDEFGHIJKL";
1359        let mut encoder = StreamingEncoder::new_with_matcher(
1360            TinyMatcher::new(4),
1361            PartialThenFailWriter::new(3, 1),
1362            CompressionLevel::Uncompressed,
1363        );
1364
1365        let first_write = encoder.write(payload).unwrap();
1366        assert_eq!(first_write, 8);
1367
1368        let second_write = encoder.write(&payload[first_write..]);
1369        assert!(second_write.is_err());
1370        assert!(encoder.flush().is_err());
1371        assert!(encoder.write_all(b"MNOP").is_err());
1372    }
1373
1374    #[test]
1375    fn new_with_matcher_and_get_mut_work() {
1376        let matcher = TinyMatcher::new(128 * 1024);
1377        let mut encoder =
1378            StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
1379        encoder.get_mut().extend_from_slice(b"");
1380        encoder.write_all(b"custom-matcher").unwrap();
1381        let compressed = encoder.finish().unwrap();
1382        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1383        let mut decoded = Vec::new();
1384        decoder.read_to_end(&mut decoded).unwrap();
1385        assert_eq!(decoded, b"custom-matcher");
1386    }
1387
1388    #[test]
1389    fn pledged_content_size_written_in_header() {
1390        let payload = b"hello world, pledged size test";
1391        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1392        encoder
1393            .set_pledged_content_size(payload.len() as u64)
1394            .unwrap();
1395        encoder.write_all(payload).unwrap();
1396        let compressed = encoder.finish().unwrap();
1397
1398        // Verify FCS is present and correct
1399        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1400            .unwrap()
1401            .0;
1402        assert_eq!(header.frame_content_size(), payload.len() as u64);
1403
1404        // Verify roundtrip
1405        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1406        let mut decoded = Vec::new();
1407        decoder.read_to_end(&mut decoded).unwrap();
1408        assert_eq!(decoded, payload);
1409    }
1410
1411    #[test]
1412    fn pledged_content_size_mismatch_returns_error() {
1413        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1414        encoder.set_pledged_content_size(100).unwrap();
1415        encoder.write_all(b"short payload").unwrap(); // 13 bytes != 100 pledged
1416        let err = encoder.finish().unwrap_err();
1417        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1418    }
1419
1420    #[test]
1421    fn write_exceeding_pledge_returns_error() {
1422        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1423        encoder.set_pledged_content_size(5).unwrap();
1424        let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
1425        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1426    }
1427
1428    #[test]
1429    fn write_straddling_pledge_reports_partial_progress() {
1430        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1431        encoder.set_pledged_content_size(5).unwrap();
1432        // write() should accept exactly 5 bytes (partial progress)
1433        assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
1434        // Next write should fail — pledge exhausted
1435        let err = encoder.write(b"g").unwrap_err();
1436        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1437    }
1438
1439    #[test]
1440    fn encoded_scratch_capacity_is_reused_across_blocks() {
1441        let payload = vec![0xAB; 64 * 3];
1442        let mut encoder = StreamingEncoder::new_with_matcher(
1443            TinyMatcher::new(64),
1444            Vec::new(),
1445            CompressionLevel::Uncompressed,
1446        );
1447
1448        encoder.write_all(&payload[..64]).unwrap();
1449        let first_capacity = encoder.encoded_scratch.capacity();
1450        assert!(
1451            first_capacity >= 67,
1452            "expected encoded scratch to keep block header + payload capacity",
1453        );
1454
1455        encoder.write_all(&payload[64..128]).unwrap();
1456        let second_capacity = encoder.encoded_scratch.capacity();
1457        assert!(
1458            second_capacity >= first_capacity,
1459            "encoded scratch capacity should be reused across block emits",
1460        );
1461
1462        encoder.write_all(&payload[128..]).unwrap();
1463        let compressed = encoder.finish().unwrap();
1464        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1465        let mut decoded = Vec::new();
1466        decoder.read_to_end(&mut decoded).unwrap();
1467        assert_eq!(decoded, payload);
1468    }
1469
1470    #[test]
1471    fn pledged_content_size_after_write_returns_error() {
1472        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1473        encoder.write_all(b"already writing").unwrap();
1474        let err = encoder.set_pledged_content_size(15).unwrap_err();
1475        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1476    }
1477
1478    #[test]
1479    fn source_size_hint_directly_reduces_window_header() {
1480        let payload = b"streaming-source-size-hint".repeat(64);
1481
1482        let mut no_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1483        no_hint.write_all(payload.as_slice()).unwrap();
1484        let no_hint_frame = no_hint.finish().unwrap();
1485        let no_hint_header = crate::decoding::frame::read_frame_header(no_hint_frame.as_slice())
1486            .unwrap()
1487            .0;
1488        let no_hint_window = no_hint_header.window_size().unwrap();
1489
1490        let mut with_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1491        with_hint
1492            .set_source_size_hint(payload.len() as u64)
1493            .unwrap();
1494        with_hint.write_all(payload.as_slice()).unwrap();
1495        let late_hint_err = with_hint
1496            .set_source_size_hint(payload.len() as u64)
1497            .unwrap_err();
1498        assert_eq!(late_hint_err.kind(), ErrorKind::InvalidInput);
1499        let with_hint_frame = with_hint.finish().unwrap();
1500        let with_hint_header =
1501            crate::decoding::frame::read_frame_header(with_hint_frame.as_slice())
1502                .unwrap()
1503                .0;
1504        let with_hint_window = with_hint_header.window_size().unwrap();
1505
1506        assert!(
1507            with_hint_window <= no_hint_window,
1508            "source size hint should not increase advertised window"
1509        );
1510
1511        let mut decoder = StreamingDecoder::new(with_hint_frame.as_slice()).unwrap();
1512        let mut decoded = Vec::new();
1513        decoder.read_to_end(&mut decoded).unwrap();
1514        assert_eq!(decoded, payload);
1515    }
1516
1517    #[test]
1518    fn single_segment_requires_pledged_to_fit_matcher_window() {
1519        let payload = b"streaming-window-gate-".repeat(60); // 1320 bytes
1520        let mut encoder = StreamingEncoder::new_with_matcher(
1521            TinyMatcher::new(1024),
1522            Vec::new(),
1523            CompressionLevel::Fastest,
1524        );
1525        encoder
1526            .set_pledged_content_size(payload.len() as u64)
1527            .unwrap();
1528        encoder.write_all(payload.as_slice()).unwrap();
1529        let compressed = encoder.finish().unwrap();
1530
1531        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1532            .unwrap()
1533            .0;
1534        assert_eq!(header.frame_content_size(), payload.len() as u64);
1535        assert!(
1536            !header.descriptor.single_segment_flag(),
1537            "single-segment must stay off when pledged content size exceeds matcher window"
1538        );
1539        assert!(
1540            header.window_size().unwrap() >= 1024,
1541            "window descriptor should be present when single-segment is disabled"
1542        );
1543    }
1544
1545    #[test]
1546    fn ensure_frame_started_refreshes_stale_strategy_tag_at_reset() {
1547        // The literal-compression gates (`min_literals_to_compress`,
1548        // `min_gain`) read `state.strategy_tag`. Regression: every
1549        // reset site MUST refresh that tag from the active compression
1550        // level — relying on construction-time initialization alone is
1551        // not enough, because later mutations or reuse patterns can
1552        // leave the tag stale.
1553        //
1554        // To exercise the RESET-time refresh (not just the
1555        // construction-time init that `StreamingEncoder::new` does for
1556        // free), this test deliberately corrupts `state.strategy_tag`
1557        // to a value that does NOT match the active level, then
1558        // triggers `ensure_frame_started` and asserts the reset path
1559        // wrote the correct tag back. If the sync line in
1560        // `ensure_frame_started` were deleted, the corrupted value
1561        // would survive the write and fail the assertion.
1562        use crate::encoding::strategy::StrategyTag;
1563        for level in [
1564            CompressionLevel::Fastest,
1565            CompressionLevel::Default,
1566            CompressionLevel::Better,
1567            CompressionLevel::Best,
1568        ] {
1569            let expected = StrategyTag::for_compression_level(level);
1570            let mut encoder = StreamingEncoder::new(Vec::new(), level);
1571            // Pick a sentinel that differs from the legitimate tag so
1572            // a missing reset-time sync is observable. BtUltra2 is the
1573            // most-aggressive variant; the four levels above resolve
1574            // to Fast/Dfast/Lazy/Lazy respectively, none equal to it.
1575            let sentinel = StrategyTag::BtUltra2;
1576            assert_ne!(
1577                expected, sentinel,
1578                "sentinel must differ from the legitimate tag at level {level:?}",
1579            );
1580            encoder.state.strategy_tag = sentinel;
1581            encoder.write_all(b"x").unwrap();
1582            assert_eq!(
1583                encoder.state.strategy_tag, expected,
1584                "reset-time strategy_tag sync missing at level {level:?}: \
1585                 sentinel survived `ensure_frame_started`",
1586            );
1587            let _ = encoder.finish().unwrap();
1588        }
1589    }
1590
1591    /// Level 22 advertises the largest default window (`window_log 27` =
1592    /// 128 MiB). Because streaming omits FCS, that window is written verbatim
1593    /// into the frame header — so the encoder's max window MUST NOT exceed the
1594    /// decoder's [`crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE`], or our own
1595    /// decoder rejects our own frame with `WindowSizeTooBig`. Regression for
1596    /// the encoder↔decoder window-cap mismatch: streaming L22 must round-trip
1597    /// through `StreamingDecoder` (and, implicitly, any stock zstd decoder,
1598    /// which accepts up to the same 128 MiB default).
1599    #[test]
1600    fn level_22_streaming_window_roundtrips_in_our_decoder() {
1601        let payload = b"level-22-streaming-window-cap-".repeat(512);
1602        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(22));
1603        for chunk in payload.chunks(101) {
1604            encoder.write_all(chunk).unwrap();
1605        }
1606        let compressed = encoder.finish().unwrap();
1607
1608        // The advertised window equals the L22 default (128 MiB) and must sit
1609        // at or below the decoder cap — otherwise the round-trip below fails.
1610        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1611            .unwrap()
1612            .0;
1613        let window = header.window_size().unwrap();
1614        assert!(
1615            window <= crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE,
1616            "L22 advertised window {window} exceeds decoder cap {}",
1617            crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE,
1618        );
1619
1620        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1621        let mut decoded = Vec::new();
1622        decoder.read_to_end(&mut decoded).unwrap();
1623        assert_eq!(decoded, payload);
1624    }
1625
1626    /// `set_content_checksum(false)` before the first write must clear the
1627    /// frame header's `Content_Checksum_flag` and the frame must still
1628    /// round-trip through the decoder.
1629    #[test]
1630    fn streaming_encoder_set_content_checksum_false_clears_header_flag() {
1631        let payload = b"streaming-checksum-toggle-".repeat(64);
1632        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1633        encoder
1634            .set_content_checksum(false)
1635            .expect("set_content_checksum pre-write");
1636        encoder.write_all(&payload).unwrap();
1637        let compressed = encoder.finish().unwrap();
1638
1639        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1640            .unwrap()
1641            .0;
1642        assert!(
1643            !header.descriptor.content_checksum_flag(),
1644            "content_checksum(false) must clear the frame header flag",
1645        );
1646
1647        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1648        let mut decoded = Vec::new();
1649        decoder.read_to_end(&mut decoded).unwrap();
1650        assert_eq!(decoded, payload);
1651    }
1652
1653    /// With the `hash` feature, disabling the checksum must drop exactly the
1654    /// 4-byte XXH64 trailer: the same payload encoded with the checksum on is
1655    /// 4 bytes longer and its header flag is set.
1656    #[cfg(feature = "hash")]
1657    #[test]
1658    fn streaming_encoder_set_content_checksum_false_omits_trailer() {
1659        let payload = b"streaming-checksum-trailer-".repeat(64);
1660
1661        let mut with = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1662        // Explicit: the encoder default is off (upstream library parity).
1663        with.set_content_checksum(true)
1664            .expect("set_content_checksum pre-write");
1665        with.write_all(&payload).unwrap();
1666        let with_checksum = with.finish().unwrap();
1667
1668        let mut without = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1669        without
1670            .set_content_checksum(false)
1671            .expect("set_content_checksum pre-write");
1672        without.write_all(&payload).unwrap();
1673        let without_checksum = without.finish().unwrap();
1674
1675        assert!(
1676            crate::decoding::frame::read_frame_header(with_checksum.as_slice())
1677                .unwrap()
1678                .0
1679                .descriptor
1680                .content_checksum_flag(),
1681            "default checksum-on frame must set the header flag",
1682        );
1683        assert_eq!(
1684            with_checksum.len(),
1685            without_checksum.len() + 4,
1686            "checksum-on frame must carry exactly the 4-byte XXH64 trailer",
1687        );
1688    }
1689
1690    /// `set_content_checksum` after the first write must error: the frame
1691    /// header (and its checksum flag) is already emitted, so a late flip would
1692    /// desync the header flag from the emitted trailer. Mirrors
1693    /// `set_magicless` / `set_pledged_content_size` semantics.
1694    #[test]
1695    fn streaming_encoder_set_content_checksum_after_first_write_errors() {
1696        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1697        encoder.write_all(b"first-block").unwrap();
1698        let err = encoder
1699            .set_content_checksum(false)
1700            .expect_err("set_content_checksum after first write must error");
1701        assert_eq!(
1702            err.kind(),
1703            ErrorKind::InvalidInput,
1704            "expected InvalidInput when setting content checksum after frame_started, got {err:?}",
1705        );
1706    }
1707
1708    #[test]
1709    fn no_pledged_size_omits_fcs_from_header() {
1710        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1711        encoder.write_all(b"no pledged size").unwrap();
1712        let compressed = encoder.finish().unwrap();
1713
1714        // FCS should be omitted from the header; the decoder reports absent FCS as 0.
1715        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1716            .unwrap()
1717            .0;
1718        assert_eq!(header.frame_content_size(), 0);
1719        // Verify the descriptor confirms FCS field is truly absent (0 bytes),
1720        // not just FCS present with value 0.
1721        assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1722    }
1723
1724    #[test]
1725    fn streaming_encoder_with_dictionary_roundtrips_and_carries_dict_id() {
1726        use alloc::format;
1727        let dict_raw = include_bytes!("../../dict_tests/dictionary");
1728        let dict_id = crate::decoding::Dictionary::decode_dict(dict_raw)
1729            .unwrap()
1730            .id;
1731
1732        // Dictionary-resembling payload (the dict was trained on similar lines),
1733        // fed in many small writes so the dict + cross-block matching are both
1734        // exercised by the streaming path.
1735        let mut payload = Vec::new();
1736        for i in 0..400u32 {
1737            payload.extend_from_slice(
1738                format!("tenant=demo table=orders key={i} region=eu payload=aaaaabbbbbccccc\n")
1739                    .as_bytes(),
1740            );
1741        }
1742
1743        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Level(19));
1744        encoder
1745            .set_dictionary_from_bytes(dict_raw)
1746            .expect("attach dictionary");
1747        for chunk in payload.chunks(777) {
1748            encoder.write_all(chunk).unwrap();
1749        }
1750        let compressed = encoder.finish().unwrap();
1751
1752        // The frame header advertises the dictionary ID (single-segment is
1753        // disabled for dictionary frames, so an explicit window is present).
1754        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1755            .unwrap()
1756            .0;
1757        assert_eq!(header.dictionary_id(), Some(dict_id));
1758
1759        // Round-trip through a decoder primed with the SAME dictionary.
1760        let mut decoder =
1761            StreamingDecoder::new_with_dictionary_bytes(compressed.as_slice(), dict_raw).unwrap();
1762        let mut decoded = Vec::new();
1763        decoder.read_to_end(&mut decoded).unwrap();
1764        assert_eq!(decoded, payload);
1765
1766        // The dictionary is actually used: the dict frame is no larger than the
1767        // no-dictionary frame on this dict-resembling payload (a dict that was
1768        // ignored could only ever make the frame the same size or bigger).
1769        let mut nodict = StreamingEncoder::new(Vec::new(), CompressionLevel::Level(19));
1770        for chunk in payload.chunks(777) {
1771            nodict.write_all(chunk).unwrap();
1772        }
1773        let nodict_frame = nodict.finish().unwrap();
1774        assert!(
1775            compressed.len() <= nodict_frame.len(),
1776            "dict frame {} should not exceed no-dict frame {}",
1777            compressed.len(),
1778            nodict_frame.len()
1779        );
1780    }
1781
1782    #[test]
1783    fn streaming_encoder_strategy_override_survives_frame_start() {
1784        // A `.strategy(...)` override must drive BOTH the matcher and the
1785        // literal-compression gates (`state.strategy_tag`) once the frame
1786        // starts. `ensure_frame_started` re-syncs the tag, so without persisting
1787        // the override it would silently fall back to the level's strategy and
1788        // diverge from `FrameCompressor` for the same parameters.
1789        use crate::encoding::{CompressionParameters, Strategy};
1790        let level = CompressionLevel::Fastest;
1791        let level_tag = crate::encoding::strategy::StrategyTag::for_compression_level(level);
1792        let override_tag = Strategy::Greedy.tag();
1793        assert_ne!(
1794            level_tag, override_tag,
1795            "test needs an override that changes the derived tag"
1796        );
1797
1798        let params = CompressionParameters::builder(level)
1799            .strategy(Strategy::Greedy)
1800            .build()
1801            .unwrap();
1802        let payload = b"override must outlive the frame header";
1803        let mut encoder = StreamingEncoder::new(Vec::new(), level);
1804        encoder.set_parameters(&params).unwrap();
1805        encoder.write_all(payload).unwrap();
1806        assert_eq!(
1807            encoder.state.strategy_tag, override_tag,
1808            "strategy override was discarded when the frame started"
1809        );
1810
1811        let compressed = encoder.finish().unwrap();
1812        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1813        let mut decoded = Vec::new();
1814        decoder.read_to_end(&mut decoded).unwrap();
1815        assert_eq!(decoded, payload);
1816    }
1817
1818    #[test]
1819    fn streaming_encoder_uncompressed_with_dictionary_omits_dict_id() {
1820        // At `Uncompressed` the matcher cannot prime a dictionary, so an
1821        // attached dictionary must NOT be reflected in the frame: advertising a
1822        // `Dictionary_ID` would force a dictionary at decode time for a frame
1823        // that does not actually depend on one. Mirrors `FrameCompressor`'s
1824        // `use_dictionary_state` gate.
1825        let dict_raw = include_bytes!("../../dict_tests/dictionary");
1826        let payload = b"tenant=demo table=orders region=eu payload=aaaaabbbbbccccc";
1827        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
1828        encoder
1829            .set_dictionary_from_bytes(dict_raw)
1830            .expect("attach dictionary");
1831        encoder.write_all(payload).unwrap();
1832        let compressed = encoder.finish().unwrap();
1833
1834        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1835            .unwrap()
1836            .0;
1837        assert_eq!(
1838            header.dictionary_id(),
1839            None,
1840            "uncompressed frame must not require a dictionary at decode time"
1841        );
1842
1843        // Decodes WITHOUT any dictionary.
1844        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1845        let mut decoded = Vec::new();
1846        decoder.read_to_end(&mut decoded).unwrap();
1847        assert_eq!(decoded, payload);
1848    }
1849}