Skip to main content

structured_zstd/encoding/
streaming_encoder.rs

1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14    CompressionLevel, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15    frame_compressor::CompressState, frame_compressor::FseTables, frame_header::FrameHeader,
16};
17use crate::io::{Error, ErrorKind, Write};
18
19/// Incremental frame encoder that implements [`Write`].
20///
21/// Data can be provided with multiple `write()` calls. Full blocks are compressed
22/// automatically, `flush()` emits the currently buffered partial block as non-last,
23/// and `finish()` closes the frame and returns the wrapped writer.
24pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
25    drain: Option<W>,
26    compression_level: CompressionLevel,
27    state: CompressState<M>,
28    pending: Vec<u8>,
29    errored: bool,
30    last_error_kind: Option<ErrorKind>,
31    last_error_message: Option<String>,
32    frame_started: bool,
33    pledged_content_size: Option<u64>,
34    bytes_consumed: u64,
35    #[cfg(feature = "hash")]
36    hasher: XxHash64,
37}
38
39impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
40    /// Creates a streaming encoder backed by the default match generator.
41    ///
42    /// The encoder writes compressed bytes into `drain` and applies `compression_level`
43    /// to all subsequently written blocks.
44    pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
45        Self::new_with_matcher(
46            MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
47            drain,
48            compression_level,
49        )
50    }
51}
52
53impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
54    /// Creates a streaming encoder with an explicitly provided matcher implementation.
55    ///
56    /// This constructor is primarily intended for tests and advanced callers that need
57    /// custom match-window behavior.
58    pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
59        Self {
60            drain: Some(drain),
61            compression_level,
62            state: CompressState {
63                matcher,
64                last_huff_table: None,
65                fse_tables: FseTables::new(),
66                offset_hist: [1, 4, 8],
67            },
68            pending: Vec::new(),
69            errored: false,
70            last_error_kind: None,
71            last_error_message: None,
72            frame_started: false,
73            pledged_content_size: None,
74            bytes_consumed: 0,
75            #[cfg(feature = "hash")]
76            hasher: XxHash64::with_seed(0),
77        }
78    }
79
80    /// Pledge the total uncompressed content size for this frame.
81    ///
82    /// When set, the frame header will include a `Frame_Content_Size` field.
83    /// This enables decoders to pre-allocate output buffers.
84    /// The pledged size is also forwarded as a source-size hint to the
85    /// matcher so small inputs can use smaller matching tables.
86    ///
87    /// Must be called **before** the first [`write`](Write::write) call;
88    /// calling it after the frame header has already been emitted returns an
89    /// error.
90    pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
91        self.ensure_open()?;
92        if self.frame_started {
93            return Err(invalid_input_error(
94                "pledged content size must be set before the first write",
95            ));
96        }
97        self.pledged_content_size = Some(size);
98        // Also use pledged size as source-size hint so the matcher
99        // can select smaller tables for small inputs.
100        self.state.matcher.set_source_size_hint(size);
101        Ok(())
102    }
103
104    /// Provide a hint about the total uncompressed size for the next frame.
105    ///
106    /// Unlike [`set_pledged_content_size`](Self::set_pledged_content_size),
107    /// this does **not** enforce that exactly `size` bytes are written; it
108    /// may reduce matcher tables, advertised frame window, and block sizing
109    /// for small inputs. Must be called before the first
110    /// [`write`](Write::write).
111    pub fn set_source_size_hint(&mut self, size: u64) -> Result<(), Error> {
112        self.ensure_open()?;
113        if self.frame_started {
114            return Err(invalid_input_error(
115                "source size hint must be set before the first write",
116            ));
117        }
118        self.state.matcher.set_source_size_hint(size);
119        Ok(())
120    }
121
122    /// Returns an immutable reference to the wrapped output drain.
123    ///
124    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
125    /// consumes the encoder and returns ownership of the drain.
126    pub fn get_ref(&self) -> &W {
127        self.drain
128            .as_ref()
129            .expect("streaming encoder drain is present until finish consumes self")
130    }
131
132    /// Returns a mutable reference to the wrapped output drain.
133    ///
134    /// It is inadvisable to directly write to the underlying writer, as doing
135    /// so would corrupt the zstd frame being assembled by the encoder.
136    ///
137    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
138    /// consumes the encoder and returns ownership of the drain.
139    pub fn get_mut(&mut self) -> &mut W {
140        self.drain
141            .as_mut()
142            .expect("streaming encoder drain is present until finish consumes self")
143    }
144
145    /// Finalizes the current zstd frame and returns the wrapped output drain.
146    ///
147    /// If no payload was written yet, this still emits a valid empty frame.
148    /// Calling this method consumes the encoder.
149    pub fn finish(mut self) -> Result<W, Error> {
150        self.ensure_open()?;
151
152        // Validate the pledge before finalizing the frame. If finish() is
153        // called before any writes, this also avoids emitting a header with
154        // an incorrect FCS into the drain on mismatch.
155        if let Some(pledged) = self.pledged_content_size
156            && self.bytes_consumed != pledged
157        {
158            return Err(invalid_input_error(
159                "pledged content size does not match bytes consumed",
160            ));
161        }
162
163        self.ensure_frame_started()?;
164
165        if self.pending.is_empty() {
166            self.write_empty_last_block()
167                .map_err(|err| self.fail(err))?;
168        } else {
169            self.emit_pending_block(true)?;
170        }
171
172        let mut drain = self
173            .drain
174            .take()
175            .expect("streaming encoder drain must be present when finishing");
176
177        #[cfg(feature = "hash")]
178        {
179            let checksum = self.hasher.finish() as u32;
180            drain
181                .write_all(&checksum.to_le_bytes())
182                .map_err(|err| self.fail(err))?;
183        }
184
185        drain.flush().map_err(|err| self.fail(err))?;
186        Ok(drain)
187    }
188
189    fn ensure_open(&self) -> Result<(), Error> {
190        if self.errored {
191            return Err(self.sticky_error());
192        }
193        Ok(())
194    }
195
196    // Cold path (only reached after poisoning). The format!() calls still allocate
197    // in no_std even though error_with_kind_message/other_error_owned drop the
198    // message; this is acceptable on an error recovery path to keep match arms simple.
199    fn sticky_error(&self) -> Error {
200        match (self.last_error_kind, self.last_error_message.as_deref()) {
201            (Some(kind), Some(message)) => error_with_kind_message(
202                kind,
203                format!(
204                    "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
205                ),
206            ),
207            (Some(kind), None) => error_from_kind(kind),
208            (None, Some(message)) => other_error_owned(format!(
209                "streaming encoder is in an errored state: {message}"
210            )),
211            (None, None) => other_error("streaming encoder is in an errored state"),
212        }
213    }
214
215    fn drain_mut(&mut self) -> Result<&mut W, Error> {
216        self.drain
217            .as_mut()
218            .ok_or_else(|| other_error("streaming encoder has no active drain"))
219    }
220
221    fn ensure_frame_started(&mut self) -> Result<(), Error> {
222        if self.frame_started {
223            return Ok(());
224        }
225
226        self.ensure_level_supported()?;
227        self.state.matcher.reset(self.compression_level);
228        self.state.offset_hist = [1, 4, 8];
229        self.state.last_huff_table = None;
230        self.state.fse_tables.ll_previous = None;
231        self.state.fse_tables.ml_previous = None;
232        self.state.fse_tables.of_previous = None;
233        #[cfg(feature = "hash")]
234        {
235            self.hasher = XxHash64::with_seed(0);
236        }
237
238        let window_size = self.state.matcher.window_size();
239        if window_size == 0 {
240            return Err(invalid_input_error(
241                "matcher reported window_size == 0, which is invalid",
242            ));
243        }
244
245        let header = FrameHeader {
246            frame_content_size: self.pledged_content_size,
247            single_segment: false,
248            content_checksum: cfg!(feature = "hash"),
249            dictionary_id: None,
250            window_size: Some(window_size),
251        };
252        let mut encoded_header = Vec::new();
253        header.serialize(&mut encoded_header);
254        self.drain_mut()
255            .and_then(|drain| drain.write_all(&encoded_header))
256            .map_err(|err| self.fail(err))?;
257
258        self.frame_started = true;
259        Ok(())
260    }
261
262    fn block_capacity(&self) -> usize {
263        let matcher_window = self.state.matcher.window_size() as usize;
264        core::cmp::max(1, core::cmp::min(matcher_window, MAX_BLOCK_SIZE as usize))
265    }
266
267    fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
268        let mut space = match self.compression_level {
269            CompressionLevel::Fastest
270            | CompressionLevel::Default
271            | CompressionLevel::Better
272            | CompressionLevel::Best
273            | CompressionLevel::Level(_) => self.state.matcher.get_next_space(),
274            CompressionLevel::Uncompressed => Vec::new(),
275        };
276        space.clear();
277        if space.capacity() > block_capacity {
278            space.shrink_to(block_capacity);
279        }
280        if space.capacity() < block_capacity {
281            space.reserve(block_capacity - space.capacity());
282        }
283        space
284    }
285
286    fn emit_full_pending_block(
287        &mut self,
288        block_capacity: usize,
289        consumed: usize,
290    ) -> Option<Result<usize, Error>> {
291        if self.pending.len() != block_capacity {
292            return None;
293        }
294
295        let new_pending = self.allocate_pending_space(block_capacity);
296        let full_block = mem::replace(&mut self.pending, new_pending);
297        if let Err((err, restored_block)) = self.encode_block(full_block, false) {
298            self.pending = restored_block;
299            let err = self.fail(err);
300            if consumed > 0 {
301                return Some(Ok(consumed));
302            }
303            return Some(Err(err));
304        }
305        None
306    }
307
308    fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
309        let block = mem::take(&mut self.pending);
310        if let Err((err, restored_block)) = self.encode_block(block, last_block) {
311            self.pending = restored_block;
312            return Err(self.fail(err));
313        }
314        if !last_block {
315            let block_capacity = self.block_capacity();
316            self.pending = self.allocate_pending_space(block_capacity);
317        }
318        Ok(())
319    }
320
321    // Exhaustive match kept intentionally: adding a new CompressionLevel
322    // variant will produce a compile error here, forcing the developer to
323    // decide whether the streaming encoder supports it before shipping.
324    fn ensure_level_supported(&self) -> Result<(), Error> {
325        match self.compression_level {
326            CompressionLevel::Uncompressed
327            | CompressionLevel::Fastest
328            | CompressionLevel::Default
329            | CompressionLevel::Better
330            | CompressionLevel::Best
331            | CompressionLevel::Level(_) => Ok(()),
332        }
333    }
334
335    fn encode_block(
336        &mut self,
337        uncompressed_data: Vec<u8>,
338        last_block: bool,
339    ) -> Result<(), (Error, Vec<u8>)> {
340        let mut raw_block = Some(uncompressed_data);
341        // TODO: reuse scratch buffer across blocks to reduce allocation churn (#47)
342        let mut encoded = Vec::with_capacity(self.block_capacity() + 3);
343        let mut moved_into_matcher = false;
344        if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
345            let header = BlockHeader {
346                last_block,
347                block_type: crate::blocks::block::BlockType::Raw,
348                block_size: 0,
349            };
350            header.serialize(&mut encoded);
351        } else {
352            match self.compression_level {
353                CompressionLevel::Uncompressed => {
354                    let block = raw_block.as_ref().expect("raw block missing");
355                    let header = BlockHeader {
356                        last_block,
357                        block_type: crate::blocks::block::BlockType::Raw,
358                        block_size: block.len() as u32,
359                    };
360                    header.serialize(&mut encoded);
361                    encoded.extend_from_slice(block);
362                }
363                CompressionLevel::Fastest
364                | CompressionLevel::Default
365                | CompressionLevel::Better
366                | CompressionLevel::Best
367                | CompressionLevel::Level(_) => {
368                    let block = raw_block.take().expect("raw block missing");
369                    debug_assert!(!block.is_empty(), "empty blocks handled above");
370                    compress_block_encoded(&mut self.state, last_block, block, &mut encoded);
371                    moved_into_matcher = true;
372                }
373            }
374        }
375
376        if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
377            let restored = if moved_into_matcher {
378                self.state.matcher.get_last_space().to_vec()
379            } else {
380                raw_block.unwrap_or_default()
381            };
382            return Err((err, restored));
383        }
384
385        if moved_into_matcher {
386            #[cfg(feature = "hash")]
387            {
388                self.hasher.write(self.state.matcher.get_last_space());
389            }
390        } else {
391            self.hash_block(raw_block.as_deref().unwrap_or(&[]));
392        }
393        Ok(())
394    }
395
396    fn write_empty_last_block(&mut self) -> Result<(), Error> {
397        self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
398    }
399
400    fn fail(&mut self, err: Error) -> Error {
401        self.errored = true;
402        if self.last_error_kind.is_none() {
403            self.last_error_kind = Some(err.kind());
404        }
405        if self.last_error_message.is_none() {
406            self.last_error_message = Some(err.to_string());
407        }
408        err
409    }
410
411    #[cfg(feature = "hash")]
412    fn hash_block(&mut self, uncompressed_data: &[u8]) {
413        self.hasher.write(uncompressed_data);
414    }
415
416    #[cfg(not(feature = "hash"))]
417    fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
418}
419
420impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
421    fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
422        self.ensure_open()?;
423        if buf.is_empty() {
424            return Ok(0);
425        }
426
427        // Check pledge before emitting the frame header so that a misuse
428        // like set_pledged_content_size(0) + write(non_empty) doesn't leave
429        // a partially-written header in the drain.
430        if let Some(pledged) = self.pledged_content_size
431            && self.bytes_consumed >= pledged
432        {
433            return Err(invalid_input_error(
434                "write would exceed pledged content size",
435            ));
436        }
437
438        self.ensure_frame_started()?;
439
440        // Enforce pledged upper bound: truncate the accepted slice to the
441        // remaining allowance so that partial-write semantics are honored
442        // (return Ok(n) with n < buf.len()) instead of failing the full call.
443        let buf = if let Some(pledged) = self.pledged_content_size {
444            let remaining_allowed = pledged
445                .checked_sub(self.bytes_consumed)
446                .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
447            if remaining_allowed == 0 {
448                return Err(invalid_input_error(
449                    "write would exceed pledged content size",
450                ));
451            }
452            let accepted = core::cmp::min(
453                buf.len(),
454                usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
455            );
456            &buf[..accepted]
457        } else {
458            buf
459        };
460
461        let block_capacity = self.block_capacity();
462        if self.pending.capacity() == 0 {
463            self.pending = self.allocate_pending_space(block_capacity);
464        }
465        let mut remaining = buf;
466        let mut consumed = 0usize;
467
468        while !remaining.is_empty() {
469            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
470                return result;
471            }
472
473            let available = block_capacity - self.pending.len();
474            let to_take = core::cmp::min(remaining.len(), available);
475            if to_take == 0 {
476                break;
477            }
478            self.pending.extend_from_slice(&remaining[..to_take]);
479            remaining = &remaining[to_take..];
480            consumed += to_take;
481
482            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
483                if let Ok(n) = &result {
484                    self.bytes_consumed += *n as u64;
485                }
486                return result;
487            }
488        }
489        self.bytes_consumed += consumed as u64;
490        Ok(consumed)
491    }
492
493    fn flush(&mut self) -> Result<(), Error> {
494        self.ensure_open()?;
495        if self.pending.is_empty() {
496            return self
497                .drain_mut()
498                .and_then(|drain| drain.flush())
499                .map_err(|err| self.fail(err));
500        }
501        self.ensure_frame_started()?;
502        self.emit_pending_block(false)?;
503        self.drain_mut()
504            .and_then(|drain| drain.flush())
505            .map_err(|err| self.fail(err))
506    }
507}
508
509fn error_from_kind(kind: ErrorKind) -> Error {
510    Error::from(kind)
511}
512
513fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
514    #[cfg(feature = "std")]
515    {
516        Error::new(kind, message)
517    }
518    #[cfg(not(feature = "std"))]
519    {
520        Error::new(kind, alloc::boxed::Box::new(message))
521    }
522}
523
524fn invalid_input_error(message: &str) -> Error {
525    #[cfg(feature = "std")]
526    {
527        Error::new(ErrorKind::InvalidInput, message)
528    }
529    #[cfg(not(feature = "std"))]
530    {
531        Error::new(
532            ErrorKind::Other,
533            alloc::boxed::Box::new(alloc::string::String::from(message)),
534        )
535    }
536}
537
538fn other_error_owned(message: String) -> Error {
539    #[cfg(feature = "std")]
540    {
541        Error::other(message)
542    }
543    #[cfg(not(feature = "std"))]
544    {
545        Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
546    }
547}
548
549fn other_error(message: &str) -> Error {
550    #[cfg(feature = "std")]
551    {
552        Error::other(message)
553    }
554    #[cfg(not(feature = "std"))]
555    {
556        Error::new(
557            ErrorKind::Other,
558            alloc::boxed::Box::new(alloc::string::String::from(message)),
559        )
560    }
561}
562
563#[cfg(test)]
564mod tests {
565    use crate::decoding::StreamingDecoder;
566    use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
567    use crate::io::{Error, ErrorKind, Read, Write};
568    use alloc::vec;
569    use alloc::vec::Vec;
570
571    struct TinyMatcher {
572        last_space: Vec<u8>,
573        window_size: u64,
574    }
575
576    impl TinyMatcher {
577        fn new(window_size: u64) -> Self {
578            Self {
579                last_space: Vec::new(),
580                window_size,
581            }
582        }
583    }
584
585    impl Matcher for TinyMatcher {
586        fn get_next_space(&mut self) -> Vec<u8> {
587            vec![0; self.window_size as usize]
588        }
589
590        fn get_last_space(&mut self) -> &[u8] {
591            self.last_space.as_slice()
592        }
593
594        fn commit_space(&mut self, space: Vec<u8>) {
595            self.last_space = space;
596        }
597
598        fn skip_matching(&mut self) {}
599
600        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
601            handle_sequence(Sequence::Literals {
602                literals: self.last_space.as_slice(),
603            });
604        }
605
606        fn reset(&mut self, _level: CompressionLevel) {
607            self.last_space.clear();
608        }
609
610        fn window_size(&self) -> u64 {
611            self.window_size
612        }
613    }
614
615    struct FailingWriteOnce {
616        writes: usize,
617        fail_on_write_number: usize,
618        sink: Vec<u8>,
619    }
620
621    impl FailingWriteOnce {
622        fn new(fail_on_write_number: usize) -> Self {
623            Self {
624                writes: 0,
625                fail_on_write_number,
626                sink: Vec::new(),
627            }
628        }
629    }
630
631    impl Write for FailingWriteOnce {
632        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
633            self.writes += 1;
634            if self.writes == self.fail_on_write_number {
635                return Err(super::other_error("injected write failure"));
636            }
637            self.sink.extend_from_slice(buf);
638            Ok(buf.len())
639        }
640
641        fn flush(&mut self) -> Result<(), Error> {
642            Ok(())
643        }
644    }
645
646    struct FailingWithKind {
647        writes: usize,
648        fail_on_write_number: usize,
649        kind: ErrorKind,
650    }
651
652    impl FailingWithKind {
653        fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
654            Self {
655                writes: 0,
656                fail_on_write_number,
657                kind,
658            }
659        }
660    }
661
662    impl Write for FailingWithKind {
663        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
664            self.writes += 1;
665            if self.writes == self.fail_on_write_number {
666                return Err(Error::from(self.kind));
667            }
668            Ok(buf.len())
669        }
670
671        fn flush(&mut self) -> Result<(), Error> {
672            Ok(())
673        }
674    }
675
676    struct PartialThenFailWriter {
677        writes: usize,
678        fail_on_write_number: usize,
679        partial_prefix_len: usize,
680        terminal_failure: bool,
681        sink: Vec<u8>,
682    }
683
684    impl PartialThenFailWriter {
685        fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
686            Self {
687                writes: 0,
688                fail_on_write_number,
689                partial_prefix_len,
690                terminal_failure: false,
691                sink: Vec::new(),
692            }
693        }
694    }
695
696    impl Write for PartialThenFailWriter {
697        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
698            if self.terminal_failure {
699                return Err(super::other_error("injected terminal write failure"));
700            }
701
702            self.writes += 1;
703            if self.writes == self.fail_on_write_number {
704                let written = core::cmp::min(self.partial_prefix_len, buf.len());
705                if written > 0 {
706                    self.sink.extend_from_slice(&buf[..written]);
707                    self.terminal_failure = true;
708                    return Ok(written);
709                }
710                return Err(super::other_error("injected terminal write failure"));
711            }
712
713            self.sink.extend_from_slice(buf);
714            Ok(buf.len())
715        }
716
717        fn flush(&mut self) -> Result<(), Error> {
718            Ok(())
719        }
720    }
721
722    #[test]
723    fn streaming_encoder_roundtrip_multiple_writes() {
724        let payload = b"streaming-encoder-roundtrip-".repeat(1024);
725        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
726        for chunk in payload.chunks(313) {
727            encoder.write_all(chunk).unwrap();
728        }
729        let compressed = encoder.finish().unwrap();
730
731        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
732        let mut decoded = Vec::new();
733        decoder.read_to_end(&mut decoded).unwrap();
734        assert_eq!(decoded, payload);
735    }
736
737    #[test]
738    fn flush_emits_nonempty_partial_output() {
739        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
740        encoder.write_all(b"partial-block").unwrap();
741        encoder.flush().unwrap();
742        let flushed_len = encoder.get_ref().len();
743        assert!(
744            flushed_len > 0,
745            "flush should emit header+partial block bytes"
746        );
747        let compressed = encoder.finish().unwrap();
748        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
749        let mut decoded = Vec::new();
750        decoder.read_to_end(&mut decoded).unwrap();
751        assert_eq!(decoded, b"partial-block");
752    }
753
754    #[test]
755    fn flush_without_writes_does_not_emit_frame_header() {
756        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
757        encoder.flush().unwrap();
758        assert!(encoder.get_ref().is_empty());
759    }
760
761    #[test]
762    fn block_boundary_write_emits_block_in_same_call() {
763        let mut boundary = StreamingEncoder::new_with_matcher(
764            TinyMatcher::new(4),
765            Vec::new(),
766            CompressionLevel::Uncompressed,
767        );
768        let mut below = StreamingEncoder::new_with_matcher(
769            TinyMatcher::new(4),
770            Vec::new(),
771            CompressionLevel::Uncompressed,
772        );
773
774        boundary.write_all(b"ABCD").unwrap();
775        below.write_all(b"ABC").unwrap();
776
777        let boundary_len = boundary.get_ref().len();
778        let below_len = below.get_ref().len();
779        assert!(
780            boundary_len > below_len,
781            "full block should be emitted immediately at block boundary"
782        );
783    }
784
785    #[test]
786    fn finish_consumes_encoder_and_emits_frame() {
787        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
788        encoder.write_all(b"abc").unwrap();
789        let compressed = encoder.finish().unwrap();
790        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
791        let mut decoded = Vec::new();
792        decoder.read_to_end(&mut decoded).unwrap();
793        assert_eq!(decoded, b"abc");
794    }
795
796    #[test]
797    fn finish_without_writes_emits_empty_frame() {
798        let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
799        let compressed = encoder.finish().unwrap();
800        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
801        let mut decoded = Vec::new();
802        decoder.read_to_end(&mut decoded).unwrap();
803        assert!(decoded.is_empty());
804    }
805
806    #[test]
807    fn write_empty_buffer_returns_zero() {
808        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
809        assert_eq!(encoder.write(&[]).unwrap(), 0);
810        let _ = encoder.finish().unwrap();
811    }
812
813    #[test]
814    fn uncompressed_level_roundtrip() {
815        let payload = b"uncompressed-streaming-roundtrip".repeat(64);
816        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
817        for chunk in payload.chunks(41) {
818            encoder.write_all(chunk).unwrap();
819        }
820        let compressed = encoder.finish().unwrap();
821        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
822        let mut decoded = Vec::new();
823        decoder.read_to_end(&mut decoded).unwrap();
824        assert_eq!(decoded, payload);
825    }
826
827    #[test]
828    fn better_level_streaming_roundtrip() {
829        let payload = b"better-level-streaming-test".repeat(256);
830        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
831        for chunk in payload.chunks(53) {
832            encoder.write_all(chunk).unwrap();
833        }
834        let compressed = encoder.finish().unwrap();
835        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
836        let mut decoded = Vec::new();
837        decoder.read_to_end(&mut decoded).unwrap();
838        assert_eq!(decoded, payload);
839    }
840
841    #[test]
842    fn zero_window_matcher_returns_invalid_input_error() {
843        let mut encoder = StreamingEncoder::new_with_matcher(
844            TinyMatcher::new(0),
845            Vec::new(),
846            CompressionLevel::Fastest,
847        );
848        let err = encoder.write_all(b"payload").unwrap_err();
849        assert_eq!(err.kind(), ErrorKind::InvalidInput);
850    }
851
852    #[test]
853    fn best_level_streaming_roundtrip() {
854        // 200 KiB payload crosses the 128 KiB block boundary, exercising
855        // multi-block emission and matcher state carry-over for Best.
856        let payload = b"best-level-streaming-test".repeat(8 * 1024);
857        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
858        for chunk in payload.chunks(53) {
859            encoder.write_all(chunk).unwrap();
860        }
861        let compressed = encoder.finish().unwrap();
862        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
863        let mut decoded = Vec::new();
864        decoder.read_to_end(&mut decoded).unwrap();
865        assert_eq!(decoded, payload);
866    }
867
868    #[test]
869    fn write_failure_poisoning_is_sticky() {
870        let mut encoder = StreamingEncoder::new_with_matcher(
871            TinyMatcher::new(4),
872            FailingWriteOnce::new(1),
873            CompressionLevel::Uncompressed,
874        );
875
876        assert!(encoder.write_all(b"ABCD").is_err());
877        assert!(encoder.flush().is_err());
878        assert!(encoder.write_all(b"EFGH").is_err());
879        assert_eq!(encoder.get_ref().sink.len(), 0);
880        assert!(encoder.finish().is_err());
881    }
882
883    #[test]
884    fn poisoned_encoder_returns_original_error_kind() {
885        let mut encoder = StreamingEncoder::new_with_matcher(
886            TinyMatcher::new(4),
887            FailingWithKind::new(1, ErrorKind::BrokenPipe),
888            CompressionLevel::Uncompressed,
889        );
890
891        let first_error = encoder.write_all(b"ABCD").unwrap_err();
892        assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
893
894        let second_error = encoder.write_all(b"EFGH").unwrap_err();
895        assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
896    }
897
898    #[test]
899    fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
900        let payload = b"ABCDEFGHIJKL";
901        let mut encoder = StreamingEncoder::new_with_matcher(
902            TinyMatcher::new(4),
903            FailingWriteOnce::new(3),
904            CompressionLevel::Uncompressed,
905        );
906
907        let first_write = encoder.write(payload).unwrap();
908        assert_eq!(first_write, 8);
909        assert!(encoder.write(&payload[first_write..]).is_err());
910        assert!(encoder.flush().is_err());
911        assert!(encoder.write_all(b"EFGH").is_err());
912    }
913
914    #[test]
915    fn partial_write_failure_after_progress_poisons_encoder() {
916        let payload = b"ABCDEFGHIJKL";
917        let mut encoder = StreamingEncoder::new_with_matcher(
918            TinyMatcher::new(4),
919            PartialThenFailWriter::new(3, 1),
920            CompressionLevel::Uncompressed,
921        );
922
923        let first_write = encoder.write(payload).unwrap();
924        assert_eq!(first_write, 8);
925
926        let second_write = encoder.write(&payload[first_write..]);
927        assert!(second_write.is_err());
928        assert!(encoder.flush().is_err());
929        assert!(encoder.write_all(b"MNOP").is_err());
930    }
931
932    #[test]
933    fn new_with_matcher_and_get_mut_work() {
934        let matcher = TinyMatcher::new(128 * 1024);
935        let mut encoder =
936            StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
937        encoder.get_mut().extend_from_slice(b"");
938        encoder.write_all(b"custom-matcher").unwrap();
939        let compressed = encoder.finish().unwrap();
940        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
941        let mut decoded = Vec::new();
942        decoder.read_to_end(&mut decoded).unwrap();
943        assert_eq!(decoded, b"custom-matcher");
944    }
945
946    #[cfg(feature = "std")]
947    #[test]
948    fn streaming_encoder_output_decompresses_with_c_zstd() {
949        let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
950        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
951        for chunk in payload.chunks(1024) {
952            encoder.write_all(chunk).unwrap();
953        }
954        let compressed = encoder.finish().unwrap();
955
956        let mut decoded = Vec::with_capacity(payload.len());
957        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
958        assert_eq!(decoded, payload);
959    }
960
961    #[test]
962    fn pledged_content_size_written_in_header() {
963        let payload = b"hello world, pledged size test";
964        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
965        encoder
966            .set_pledged_content_size(payload.len() as u64)
967            .unwrap();
968        encoder.write_all(payload).unwrap();
969        let compressed = encoder.finish().unwrap();
970
971        // Verify FCS is present and correct
972        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
973            .unwrap()
974            .0;
975        assert_eq!(header.frame_content_size(), payload.len() as u64);
976
977        // Verify roundtrip
978        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
979        let mut decoded = Vec::new();
980        decoder.read_to_end(&mut decoded).unwrap();
981        assert_eq!(decoded, payload);
982    }
983
984    #[test]
985    fn pledged_content_size_mismatch_returns_error() {
986        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
987        encoder.set_pledged_content_size(100).unwrap();
988        encoder.write_all(b"short payload").unwrap(); // 13 bytes != 100 pledged
989        let err = encoder.finish().unwrap_err();
990        assert_eq!(err.kind(), ErrorKind::InvalidInput);
991    }
992
993    #[test]
994    fn write_exceeding_pledge_returns_error() {
995        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
996        encoder.set_pledged_content_size(5).unwrap();
997        let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
998        assert_eq!(err.kind(), ErrorKind::InvalidInput);
999    }
1000
1001    #[test]
1002    fn write_straddling_pledge_reports_partial_progress() {
1003        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1004        encoder.set_pledged_content_size(5).unwrap();
1005        // write() should accept exactly 5 bytes (partial progress)
1006        assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
1007        // Next write should fail — pledge exhausted
1008        let err = encoder.write(b"g").unwrap_err();
1009        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1010    }
1011
1012    #[test]
1013    fn pledged_content_size_after_write_returns_error() {
1014        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1015        encoder.write_all(b"already writing").unwrap();
1016        let err = encoder.set_pledged_content_size(15).unwrap_err();
1017        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1018    }
1019
1020    #[test]
1021    fn source_size_hint_directly_reduces_window_header() {
1022        let payload = b"streaming-source-size-hint".repeat(64);
1023
1024        let mut no_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1025        no_hint.write_all(payload.as_slice()).unwrap();
1026        let no_hint_frame = no_hint.finish().unwrap();
1027        let no_hint_header = crate::decoding::frame::read_frame_header(no_hint_frame.as_slice())
1028            .unwrap()
1029            .0;
1030        let no_hint_window = no_hint_header.window_size().unwrap();
1031
1032        let mut with_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1033        with_hint
1034            .set_source_size_hint(payload.len() as u64)
1035            .unwrap();
1036        with_hint.write_all(payload.as_slice()).unwrap();
1037        let late_hint_err = with_hint
1038            .set_source_size_hint(payload.len() as u64)
1039            .unwrap_err();
1040        assert_eq!(late_hint_err.kind(), ErrorKind::InvalidInput);
1041        let with_hint_frame = with_hint.finish().unwrap();
1042        let with_hint_header =
1043            crate::decoding::frame::read_frame_header(with_hint_frame.as_slice())
1044                .unwrap()
1045                .0;
1046        let with_hint_window = with_hint_header.window_size().unwrap();
1047
1048        assert!(
1049            with_hint_window <= no_hint_window,
1050            "source size hint should not increase advertised window"
1051        );
1052
1053        let mut decoder = StreamingDecoder::new(with_hint_frame.as_slice()).unwrap();
1054        let mut decoded = Vec::new();
1055        decoder.read_to_end(&mut decoded).unwrap();
1056        assert_eq!(decoded, payload);
1057    }
1058
1059    #[cfg(feature = "std")]
1060    #[test]
1061    fn pledged_content_size_c_zstd_compatible() {
1062        let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
1063        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1064        encoder
1065            .set_pledged_content_size(payload.len() as u64)
1066            .unwrap();
1067        for chunk in payload.chunks(1024) {
1068            encoder.write_all(chunk).unwrap();
1069        }
1070        let compressed = encoder.finish().unwrap();
1071
1072        // FCS should be written
1073        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1074            .unwrap()
1075            .0;
1076        assert_eq!(header.frame_content_size(), payload.len() as u64);
1077
1078        // C zstd should decompress successfully
1079        let mut decoded = Vec::new();
1080        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1081        assert_eq!(decoded, payload);
1082    }
1083
1084    #[test]
1085    fn no_pledged_size_omits_fcs_from_header() {
1086        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1087        encoder.write_all(b"no pledged size").unwrap();
1088        let compressed = encoder.finish().unwrap();
1089
1090        // FCS should be omitted from the header; the decoder reports absent FCS as 0.
1091        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1092            .unwrap()
1093            .0;
1094        assert_eq!(header.frame_content_size(), 0);
1095        // Verify the descriptor confirms FCS field is truly absent (0 bytes),
1096        // not just FCS present with value 0.
1097        assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1098    }
1099}