Skip to main content

structured_zstd/encoding/
streaming_encoder.rs

1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14    CompressionLevel, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15    frame_compressor::CompressState, frame_compressor::FseTables, frame_header::FrameHeader,
16};
17use crate::io::{Error, ErrorKind, Write};
18
19/// Incremental frame encoder that implements [`Write`].
20///
21/// Data can be provided with multiple `write()` calls. Full blocks are compressed
22/// automatically, `flush()` emits the currently buffered partial block as non-last,
23/// and `finish()` closes the frame and returns the wrapped writer.
24pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
25    drain: Option<W>,
26    compression_level: CompressionLevel,
27    state: CompressState<M>,
28    pending: Vec<u8>,
29    errored: bool,
30    last_error_kind: Option<ErrorKind>,
31    last_error_message: Option<String>,
32    frame_started: bool,
33    #[cfg(feature = "hash")]
34    hasher: XxHash64,
35}
36
37impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
38    /// Creates a streaming encoder backed by the default match generator.
39    ///
40    /// The encoder writes compressed bytes into `drain` and applies `compression_level`
41    /// to all subsequently written blocks.
42    pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
43        Self::new_with_matcher(
44            MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
45            drain,
46            compression_level,
47        )
48    }
49}
50
51impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
52    /// Creates a streaming encoder with an explicitly provided matcher implementation.
53    ///
54    /// This constructor is primarily intended for tests and advanced callers that need
55    /// custom match-window behavior.
56    pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
57        Self {
58            drain: Some(drain),
59            compression_level,
60            state: CompressState {
61                matcher,
62                last_huff_table: None,
63                fse_tables: FseTables::new(),
64                offset_hist: [1, 4, 8],
65            },
66            pending: Vec::new(),
67            errored: false,
68            last_error_kind: None,
69            last_error_message: None,
70            frame_started: false,
71            #[cfg(feature = "hash")]
72            hasher: XxHash64::with_seed(0),
73        }
74    }
75
76    /// Returns an immutable reference to the wrapped output drain.
77    ///
78    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
79    /// consumes the encoder and returns ownership of the drain.
80    pub fn get_ref(&self) -> &W {
81        self.drain
82            .as_ref()
83            .expect("streaming encoder drain is present until finish consumes self")
84    }
85
86    /// Returns a mutable reference to the wrapped output drain.
87    ///
88    /// It is inadvisable to directly write to the underlying writer, as doing
89    /// so would corrupt the zstd frame being assembled by the encoder.
90    ///
91    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
92    /// consumes the encoder and returns ownership of the drain.
93    pub fn get_mut(&mut self) -> &mut W {
94        self.drain
95            .as_mut()
96            .expect("streaming encoder drain is present until finish consumes self")
97    }
98
99    /// Finalizes the current zstd frame and returns the wrapped output drain.
100    ///
101    /// If no payload was written yet, this still emits a valid empty frame.
102    /// Calling this method consumes the encoder.
103    pub fn finish(mut self) -> Result<W, Error> {
104        self.ensure_open()?;
105        self.ensure_frame_started()?;
106
107        if self.pending.is_empty() {
108            self.write_empty_last_block()
109                .map_err(|err| self.fail(err))?;
110        } else {
111            self.emit_pending_block(true)?;
112        }
113
114        let mut drain = self
115            .drain
116            .take()
117            .expect("streaming encoder drain must be present when finishing");
118
119        #[cfg(feature = "hash")]
120        {
121            let checksum = self.hasher.finish() as u32;
122            drain
123                .write_all(&checksum.to_le_bytes())
124                .map_err(|err| self.fail(err))?;
125        }
126
127        drain.flush().map_err(|err| self.fail(err))?;
128        Ok(drain)
129    }
130
131    fn ensure_open(&self) -> Result<(), Error> {
132        if self.errored {
133            return Err(self.sticky_error());
134        }
135        Ok(())
136    }
137
138    // Cold path (only reached after poisoning). The format!() calls still allocate
139    // in no_std even though error_with_kind_message/other_error_owned drop the
140    // message; this is acceptable on an error recovery path to keep match arms simple.
141    fn sticky_error(&self) -> Error {
142        match (self.last_error_kind, self.last_error_message.as_deref()) {
143            (Some(kind), Some(message)) => error_with_kind_message(
144                kind,
145                format!(
146                    "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
147                ),
148            ),
149            (Some(kind), None) => error_from_kind(kind),
150            (None, Some(message)) => other_error_owned(format!(
151                "streaming encoder is in an errored state: {message}"
152            )),
153            (None, None) => other_error("streaming encoder is in an errored state"),
154        }
155    }
156
157    fn drain_mut(&mut self) -> Result<&mut W, Error> {
158        self.drain
159            .as_mut()
160            .ok_or_else(|| other_error("streaming encoder has no active drain"))
161    }
162
163    fn ensure_frame_started(&mut self) -> Result<(), Error> {
164        if self.frame_started {
165            return Ok(());
166        }
167
168        self.ensure_level_supported()?;
169        self.state.matcher.reset(self.compression_level);
170        self.state.offset_hist = [1, 4, 8];
171        self.state.last_huff_table = None;
172        self.state.fse_tables.ll_previous = None;
173        self.state.fse_tables.ml_previous = None;
174        self.state.fse_tables.of_previous = None;
175        #[cfg(feature = "hash")]
176        {
177            self.hasher = XxHash64::with_seed(0);
178        }
179
180        let window_size = self.state.matcher.window_size();
181        if window_size == 0 {
182            return Err(invalid_input_error(
183                "matcher reported window_size == 0, which is invalid",
184            ));
185        }
186
187        let header = FrameHeader {
188            frame_content_size: None,
189            single_segment: false,
190            content_checksum: cfg!(feature = "hash"),
191            dictionary_id: None,
192            window_size: Some(window_size),
193        };
194        let mut encoded_header = Vec::new();
195        header.serialize(&mut encoded_header);
196        self.drain_mut()
197            .and_then(|drain| drain.write_all(&encoded_header))
198            .map_err(|err| self.fail(err))?;
199
200        self.frame_started = true;
201        Ok(())
202    }
203
204    fn block_capacity(&self) -> usize {
205        let matcher_window = self.state.matcher.window_size() as usize;
206        core::cmp::max(1, core::cmp::min(matcher_window, MAX_BLOCK_SIZE as usize))
207    }
208
209    fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
210        let mut space = match self.compression_level {
211            CompressionLevel::Fastest
212            | CompressionLevel::Default
213            | CompressionLevel::Better
214            | CompressionLevel::Best => self.state.matcher.get_next_space(),
215            _ => Vec::new(),
216        };
217        space.clear();
218        if space.capacity() > block_capacity {
219            space.shrink_to(block_capacity);
220        }
221        if space.capacity() < block_capacity {
222            space.reserve(block_capacity - space.capacity());
223        }
224        space
225    }
226
227    fn emit_full_pending_block(
228        &mut self,
229        block_capacity: usize,
230        consumed: usize,
231    ) -> Option<Result<usize, Error>> {
232        if self.pending.len() != block_capacity {
233            return None;
234        }
235
236        let new_pending = self.allocate_pending_space(block_capacity);
237        let full_block = mem::replace(&mut self.pending, new_pending);
238        if let Err((err, restored_block)) = self.encode_block(full_block, false) {
239            self.pending = restored_block;
240            let err = self.fail(err);
241            if consumed > 0 {
242                return Some(Ok(consumed));
243            }
244            return Some(Err(err));
245        }
246        None
247    }
248
249    fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
250        let block = mem::take(&mut self.pending);
251        if let Err((err, restored_block)) = self.encode_block(block, last_block) {
252            self.pending = restored_block;
253            return Err(self.fail(err));
254        }
255        if !last_block {
256            let block_capacity = self.block_capacity();
257            self.pending = self.allocate_pending_space(block_capacity);
258        }
259        Ok(())
260    }
261
262    // Exhaustive match kept intentionally: adding a new CompressionLevel
263    // variant will produce a compile error here, forcing the developer to
264    // decide whether the streaming encoder supports it before shipping.
265    fn ensure_level_supported(&self) -> Result<(), Error> {
266        match self.compression_level {
267            CompressionLevel::Uncompressed
268            | CompressionLevel::Fastest
269            | CompressionLevel::Default
270            | CompressionLevel::Better
271            | CompressionLevel::Best => Ok(()),
272        }
273    }
274
275    fn encode_block(
276        &mut self,
277        uncompressed_data: Vec<u8>,
278        last_block: bool,
279    ) -> Result<(), (Error, Vec<u8>)> {
280        let mut raw_block = Some(uncompressed_data);
281        // TODO: reuse scratch buffer across blocks to reduce allocation churn (#47)
282        let mut encoded = Vec::with_capacity(self.block_capacity() + 3);
283        let mut moved_into_matcher = false;
284        if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
285            let header = BlockHeader {
286                last_block,
287                block_type: crate::blocks::block::BlockType::Raw,
288                block_size: 0,
289            };
290            header.serialize(&mut encoded);
291        } else {
292            match self.compression_level {
293                CompressionLevel::Uncompressed => {
294                    let block = raw_block.as_ref().expect("raw block missing");
295                    let header = BlockHeader {
296                        last_block,
297                        block_type: crate::blocks::block::BlockType::Raw,
298                        block_size: block.len() as u32,
299                    };
300                    header.serialize(&mut encoded);
301                    encoded.extend_from_slice(block);
302                }
303                CompressionLevel::Fastest
304                | CompressionLevel::Default
305                | CompressionLevel::Better
306                | CompressionLevel::Best => {
307                    let block = raw_block.take().expect("raw block missing");
308                    debug_assert!(!block.is_empty(), "empty blocks handled above");
309                    compress_block_encoded(&mut self.state, last_block, block, &mut encoded);
310                    moved_into_matcher = true;
311                }
312            }
313        }
314
315        if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
316            let restored = if moved_into_matcher {
317                self.state.matcher.get_last_space().to_vec()
318            } else {
319                raw_block.unwrap_or_default()
320            };
321            return Err((err, restored));
322        }
323
324        if moved_into_matcher {
325            #[cfg(feature = "hash")]
326            {
327                self.hasher.write(self.state.matcher.get_last_space());
328            }
329        } else {
330            self.hash_block(raw_block.as_deref().unwrap_or(&[]));
331        }
332        Ok(())
333    }
334
335    fn write_empty_last_block(&mut self) -> Result<(), Error> {
336        self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
337    }
338
339    fn fail(&mut self, err: Error) -> Error {
340        self.errored = true;
341        if self.last_error_kind.is_none() {
342            self.last_error_kind = Some(err.kind());
343        }
344        if self.last_error_message.is_none() {
345            self.last_error_message = Some(err.to_string());
346        }
347        err
348    }
349
350    #[cfg(feature = "hash")]
351    fn hash_block(&mut self, uncompressed_data: &[u8]) {
352        self.hasher.write(uncompressed_data);
353    }
354
355    #[cfg(not(feature = "hash"))]
356    fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
357}
358
359impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
360    fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
361        self.ensure_open()?;
362        if buf.is_empty() {
363            return Ok(0);
364        }
365
366        self.ensure_frame_started()?;
367        let block_capacity = self.block_capacity();
368        if self.pending.capacity() == 0 {
369            self.pending = self.allocate_pending_space(block_capacity);
370        }
371        let mut remaining = buf;
372        let mut consumed = 0usize;
373
374        while !remaining.is_empty() {
375            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
376                return result;
377            }
378
379            let available = block_capacity - self.pending.len();
380            let to_take = core::cmp::min(remaining.len(), available);
381            if to_take == 0 {
382                break;
383            }
384            self.pending.extend_from_slice(&remaining[..to_take]);
385            remaining = &remaining[to_take..];
386            consumed += to_take;
387
388            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
389                return result;
390            }
391        }
392        Ok(consumed)
393    }
394
395    fn flush(&mut self) -> Result<(), Error> {
396        self.ensure_open()?;
397        if self.pending.is_empty() {
398            return self
399                .drain_mut()
400                .and_then(|drain| drain.flush())
401                .map_err(|err| self.fail(err));
402        }
403        self.ensure_frame_started()?;
404        self.emit_pending_block(false)?;
405        self.drain_mut()
406            .and_then(|drain| drain.flush())
407            .map_err(|err| self.fail(err))
408    }
409}
410
411fn error_from_kind(kind: ErrorKind) -> Error {
412    Error::from(kind)
413}
414
415fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
416    #[cfg(feature = "std")]
417    {
418        Error::new(kind, message)
419    }
420    #[cfg(not(feature = "std"))]
421    {
422        Error::new(kind, alloc::boxed::Box::new(message))
423    }
424}
425
426fn invalid_input_error(message: &str) -> Error {
427    #[cfg(feature = "std")]
428    {
429        Error::new(ErrorKind::InvalidInput, message)
430    }
431    #[cfg(not(feature = "std"))]
432    {
433        Error::new(
434            ErrorKind::Other,
435            alloc::boxed::Box::new(alloc::string::String::from(message)),
436        )
437    }
438}
439
440fn other_error_owned(message: String) -> Error {
441    #[cfg(feature = "std")]
442    {
443        Error::other(message)
444    }
445    #[cfg(not(feature = "std"))]
446    {
447        Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
448    }
449}
450
451fn other_error(message: &str) -> Error {
452    #[cfg(feature = "std")]
453    {
454        Error::other(message)
455    }
456    #[cfg(not(feature = "std"))]
457    {
458        Error::new(
459            ErrorKind::Other,
460            alloc::boxed::Box::new(alloc::string::String::from(message)),
461        )
462    }
463}
464
465#[cfg(test)]
466mod tests {
467    use crate::decoding::StreamingDecoder;
468    use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
469    use crate::io::{Error, ErrorKind, Read, Write};
470    use alloc::vec;
471    use alloc::vec::Vec;
472
473    struct TinyMatcher {
474        last_space: Vec<u8>,
475        window_size: u64,
476    }
477
478    impl TinyMatcher {
479        fn new(window_size: u64) -> Self {
480            Self {
481                last_space: Vec::new(),
482                window_size,
483            }
484        }
485    }
486
487    impl Matcher for TinyMatcher {
488        fn get_next_space(&mut self) -> Vec<u8> {
489            vec![0; self.window_size as usize]
490        }
491
492        fn get_last_space(&mut self) -> &[u8] {
493            self.last_space.as_slice()
494        }
495
496        fn commit_space(&mut self, space: Vec<u8>) {
497            self.last_space = space;
498        }
499
500        fn skip_matching(&mut self) {}
501
502        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
503            handle_sequence(Sequence::Literals {
504                literals: self.last_space.as_slice(),
505            });
506        }
507
508        fn reset(&mut self, _level: CompressionLevel) {
509            self.last_space.clear();
510        }
511
512        fn window_size(&self) -> u64 {
513            self.window_size
514        }
515    }
516
517    struct FailingWriteOnce {
518        writes: usize,
519        fail_on_write_number: usize,
520        sink: Vec<u8>,
521    }
522
523    impl FailingWriteOnce {
524        fn new(fail_on_write_number: usize) -> Self {
525            Self {
526                writes: 0,
527                fail_on_write_number,
528                sink: Vec::new(),
529            }
530        }
531    }
532
533    impl Write for FailingWriteOnce {
534        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
535            self.writes += 1;
536            if self.writes == self.fail_on_write_number {
537                return Err(super::other_error("injected write failure"));
538            }
539            self.sink.extend_from_slice(buf);
540            Ok(buf.len())
541        }
542
543        fn flush(&mut self) -> Result<(), Error> {
544            Ok(())
545        }
546    }
547
548    struct FailingWithKind {
549        writes: usize,
550        fail_on_write_number: usize,
551        kind: ErrorKind,
552    }
553
554    impl FailingWithKind {
555        fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
556            Self {
557                writes: 0,
558                fail_on_write_number,
559                kind,
560            }
561        }
562    }
563
564    impl Write for FailingWithKind {
565        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
566            self.writes += 1;
567            if self.writes == self.fail_on_write_number {
568                return Err(Error::from(self.kind));
569            }
570            Ok(buf.len())
571        }
572
573        fn flush(&mut self) -> Result<(), Error> {
574            Ok(())
575        }
576    }
577
578    struct PartialThenFailWriter {
579        writes: usize,
580        fail_on_write_number: usize,
581        partial_prefix_len: usize,
582        terminal_failure: bool,
583        sink: Vec<u8>,
584    }
585
586    impl PartialThenFailWriter {
587        fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
588            Self {
589                writes: 0,
590                fail_on_write_number,
591                partial_prefix_len,
592                terminal_failure: false,
593                sink: Vec::new(),
594            }
595        }
596    }
597
598    impl Write for PartialThenFailWriter {
599        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
600            if self.terminal_failure {
601                return Err(super::other_error("injected terminal write failure"));
602            }
603
604            self.writes += 1;
605            if self.writes == self.fail_on_write_number {
606                let written = core::cmp::min(self.partial_prefix_len, buf.len());
607                if written > 0 {
608                    self.sink.extend_from_slice(&buf[..written]);
609                    self.terminal_failure = true;
610                    return Ok(written);
611                }
612                return Err(super::other_error("injected terminal write failure"));
613            }
614
615            self.sink.extend_from_slice(buf);
616            Ok(buf.len())
617        }
618
619        fn flush(&mut self) -> Result<(), Error> {
620            Ok(())
621        }
622    }
623
624    #[test]
625    fn streaming_encoder_roundtrip_multiple_writes() {
626        let payload = b"streaming-encoder-roundtrip-".repeat(1024);
627        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
628        for chunk in payload.chunks(313) {
629            encoder.write_all(chunk).unwrap();
630        }
631        let compressed = encoder.finish().unwrap();
632
633        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
634        let mut decoded = Vec::new();
635        decoder.read_to_end(&mut decoded).unwrap();
636        assert_eq!(decoded, payload);
637    }
638
639    #[test]
640    fn flush_emits_nonempty_partial_output() {
641        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
642        encoder.write_all(b"partial-block").unwrap();
643        encoder.flush().unwrap();
644        let flushed_len = encoder.get_ref().len();
645        assert!(
646            flushed_len > 0,
647            "flush should emit header+partial block bytes"
648        );
649        let compressed = encoder.finish().unwrap();
650        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
651        let mut decoded = Vec::new();
652        decoder.read_to_end(&mut decoded).unwrap();
653        assert_eq!(decoded, b"partial-block");
654    }
655
656    #[test]
657    fn flush_without_writes_does_not_emit_frame_header() {
658        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
659        encoder.flush().unwrap();
660        assert!(encoder.get_ref().is_empty());
661    }
662
663    #[test]
664    fn block_boundary_write_emits_block_in_same_call() {
665        let mut boundary = StreamingEncoder::new_with_matcher(
666            TinyMatcher::new(4),
667            Vec::new(),
668            CompressionLevel::Uncompressed,
669        );
670        let mut below = StreamingEncoder::new_with_matcher(
671            TinyMatcher::new(4),
672            Vec::new(),
673            CompressionLevel::Uncompressed,
674        );
675
676        boundary.write_all(b"ABCD").unwrap();
677        below.write_all(b"ABC").unwrap();
678
679        let boundary_len = boundary.get_ref().len();
680        let below_len = below.get_ref().len();
681        assert!(
682            boundary_len > below_len,
683            "full block should be emitted immediately at block boundary"
684        );
685    }
686
687    #[test]
688    fn finish_consumes_encoder_and_emits_frame() {
689        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
690        encoder.write_all(b"abc").unwrap();
691        let compressed = encoder.finish().unwrap();
692        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
693        let mut decoded = Vec::new();
694        decoder.read_to_end(&mut decoded).unwrap();
695        assert_eq!(decoded, b"abc");
696    }
697
698    #[test]
699    fn finish_without_writes_emits_empty_frame() {
700        let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
701        let compressed = encoder.finish().unwrap();
702        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
703        let mut decoded = Vec::new();
704        decoder.read_to_end(&mut decoded).unwrap();
705        assert!(decoded.is_empty());
706    }
707
708    #[test]
709    fn write_empty_buffer_returns_zero() {
710        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
711        assert_eq!(encoder.write(&[]).unwrap(), 0);
712        let _ = encoder.finish().unwrap();
713    }
714
715    #[test]
716    fn uncompressed_level_roundtrip() {
717        let payload = b"uncompressed-streaming-roundtrip".repeat(64);
718        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
719        for chunk in payload.chunks(41) {
720            encoder.write_all(chunk).unwrap();
721        }
722        let compressed = encoder.finish().unwrap();
723        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
724        let mut decoded = Vec::new();
725        decoder.read_to_end(&mut decoded).unwrap();
726        assert_eq!(decoded, payload);
727    }
728
729    #[test]
730    fn better_level_streaming_roundtrip() {
731        let payload = b"better-level-streaming-test".repeat(256);
732        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
733        for chunk in payload.chunks(53) {
734            encoder.write_all(chunk).unwrap();
735        }
736        let compressed = encoder.finish().unwrap();
737        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
738        let mut decoded = Vec::new();
739        decoder.read_to_end(&mut decoded).unwrap();
740        assert_eq!(decoded, payload);
741    }
742
743    #[test]
744    fn zero_window_matcher_returns_invalid_input_error() {
745        let mut encoder = StreamingEncoder::new_with_matcher(
746            TinyMatcher::new(0),
747            Vec::new(),
748            CompressionLevel::Fastest,
749        );
750        let err = encoder.write_all(b"payload").unwrap_err();
751        assert_eq!(err.kind(), ErrorKind::InvalidInput);
752    }
753
754    #[test]
755    fn best_level_streaming_roundtrip() {
756        // 200 KiB payload crosses the 128 KiB block boundary, exercising
757        // multi-block emission and matcher state carry-over for Best.
758        let payload = b"best-level-streaming-test".repeat(8 * 1024);
759        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
760        for chunk in payload.chunks(53) {
761            encoder.write_all(chunk).unwrap();
762        }
763        let compressed = encoder.finish().unwrap();
764        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
765        let mut decoded = Vec::new();
766        decoder.read_to_end(&mut decoded).unwrap();
767        assert_eq!(decoded, payload);
768    }
769
770    #[test]
771    fn write_failure_poisoning_is_sticky() {
772        let mut encoder = StreamingEncoder::new_with_matcher(
773            TinyMatcher::new(4),
774            FailingWriteOnce::new(1),
775            CompressionLevel::Uncompressed,
776        );
777
778        assert!(encoder.write_all(b"ABCD").is_err());
779        assert!(encoder.flush().is_err());
780        assert!(encoder.write_all(b"EFGH").is_err());
781        assert_eq!(encoder.get_ref().sink.len(), 0);
782        assert!(encoder.finish().is_err());
783    }
784
785    #[test]
786    fn poisoned_encoder_returns_original_error_kind() {
787        let mut encoder = StreamingEncoder::new_with_matcher(
788            TinyMatcher::new(4),
789            FailingWithKind::new(1, ErrorKind::BrokenPipe),
790            CompressionLevel::Uncompressed,
791        );
792
793        let first_error = encoder.write_all(b"ABCD").unwrap_err();
794        assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
795
796        let second_error = encoder.write_all(b"EFGH").unwrap_err();
797        assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
798    }
799
800    #[test]
801    fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
802        let payload = b"ABCDEFGHIJKL";
803        let mut encoder = StreamingEncoder::new_with_matcher(
804            TinyMatcher::new(4),
805            FailingWriteOnce::new(3),
806            CompressionLevel::Uncompressed,
807        );
808
809        let first_write = encoder.write(payload).unwrap();
810        assert_eq!(first_write, 8);
811        assert!(encoder.write(&payload[first_write..]).is_err());
812        assert!(encoder.flush().is_err());
813        assert!(encoder.write_all(b"EFGH").is_err());
814    }
815
816    #[test]
817    fn partial_write_failure_after_progress_poisons_encoder() {
818        let payload = b"ABCDEFGHIJKL";
819        let mut encoder = StreamingEncoder::new_with_matcher(
820            TinyMatcher::new(4),
821            PartialThenFailWriter::new(3, 1),
822            CompressionLevel::Uncompressed,
823        );
824
825        let first_write = encoder.write(payload).unwrap();
826        assert_eq!(first_write, 8);
827
828        let second_write = encoder.write(&payload[first_write..]);
829        assert!(second_write.is_err());
830        assert!(encoder.flush().is_err());
831        assert!(encoder.write_all(b"MNOP").is_err());
832    }
833
834    #[test]
835    fn new_with_matcher_and_get_mut_work() {
836        let matcher = TinyMatcher::new(128 * 1024);
837        let mut encoder =
838            StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
839        encoder.get_mut().extend_from_slice(b"");
840        encoder.write_all(b"custom-matcher").unwrap();
841        let compressed = encoder.finish().unwrap();
842        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
843        let mut decoded = Vec::new();
844        decoder.read_to_end(&mut decoded).unwrap();
845        assert_eq!(decoded, b"custom-matcher");
846    }
847
848    #[cfg(feature = "std")]
849    #[test]
850    fn streaming_encoder_output_decompresses_with_c_zstd() {
851        let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
852        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
853        for chunk in payload.chunks(1024) {
854            encoder.write_all(chunk).unwrap();
855        }
856        let compressed = encoder.finish().unwrap();
857
858        let mut decoded = Vec::with_capacity(payload.len());
859        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
860        assert_eq!(decoded, payload);
861    }
862}