Skip to main content

structured_zstd/encoding/
streaming_encoder.rs

1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14    CompressionLevel, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15    frame_compressor::CompressState, frame_compressor::FseTables, frame_header::FrameHeader,
16};
17use crate::io::{Error, ErrorKind, Write};
18
19/// Incremental frame encoder that implements [`Write`].
20///
21/// Data can be provided with multiple `write()` calls. Full blocks are compressed
22/// automatically, `flush()` emits the currently buffered partial block as non-last,
23/// and `finish()` closes the frame and returns the wrapped writer.
24pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
25    drain: Option<W>,
26    compression_level: CompressionLevel,
27    state: CompressState<M>,
28    pending: Vec<u8>,
29    errored: bool,
30    last_error_kind: Option<ErrorKind>,
31    last_error_message: Option<String>,
32    frame_started: bool,
33    pledged_content_size: Option<u64>,
34    bytes_consumed: u64,
35    #[cfg(feature = "hash")]
36    hasher: XxHash64,
37}
38
39impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
40    /// Creates a streaming encoder backed by the default match generator.
41    ///
42    /// The encoder writes compressed bytes into `drain` and applies `compression_level`
43    /// to all subsequently written blocks.
44    pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
45        Self::new_with_matcher(
46            MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
47            drain,
48            compression_level,
49        )
50    }
51}
52
53impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
54    /// Creates a streaming encoder with an explicitly provided matcher implementation.
55    ///
56    /// This constructor is primarily intended for tests and advanced callers that need
57    /// custom match-window behavior.
58    pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
59        Self {
60            drain: Some(drain),
61            compression_level,
62            state: CompressState {
63                matcher,
64                last_huff_table: None,
65                fse_tables: FseTables::new(),
66                offset_hist: [1, 4, 8],
67            },
68            pending: Vec::new(),
69            errored: false,
70            last_error_kind: None,
71            last_error_message: None,
72            frame_started: false,
73            pledged_content_size: None,
74            bytes_consumed: 0,
75            #[cfg(feature = "hash")]
76            hasher: XxHash64::with_seed(0),
77        }
78    }
79
80    /// Pledge the total uncompressed content size for this frame.
81    ///
82    /// When set, the frame header will include a `Frame_Content_Size` field.
83    /// This enables decoders to pre-allocate output buffers.
84    ///
85    /// Must be called **before** the first [`write`](Write::write) call;
86    /// calling it after the frame header has already been emitted returns an
87    /// error.
88    pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
89        self.ensure_open()?;
90        if self.frame_started {
91            return Err(invalid_input_error(
92                "pledged content size must be set before the first write",
93            ));
94        }
95        self.pledged_content_size = Some(size);
96        Ok(())
97    }
98
99    /// Returns an immutable reference to the wrapped output drain.
100    ///
101    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
102    /// consumes the encoder and returns ownership of the drain.
103    pub fn get_ref(&self) -> &W {
104        self.drain
105            .as_ref()
106            .expect("streaming encoder drain is present until finish consumes self")
107    }
108
109    /// Returns a mutable reference to the wrapped output drain.
110    ///
111    /// It is inadvisable to directly write to the underlying writer, as doing
112    /// so would corrupt the zstd frame being assembled by the encoder.
113    ///
114    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
115    /// consumes the encoder and returns ownership of the drain.
116    pub fn get_mut(&mut self) -> &mut W {
117        self.drain
118            .as_mut()
119            .expect("streaming encoder drain is present until finish consumes self")
120    }
121
122    /// Finalizes the current zstd frame and returns the wrapped output drain.
123    ///
124    /// If no payload was written yet, this still emits a valid empty frame.
125    /// Calling this method consumes the encoder.
126    pub fn finish(mut self) -> Result<W, Error> {
127        self.ensure_open()?;
128
129        // Validate the pledge before finalizing the frame. If finish() is
130        // called before any writes, this also avoids emitting a header with
131        // an incorrect FCS into the drain on mismatch.
132        if let Some(pledged) = self.pledged_content_size
133            && self.bytes_consumed != pledged
134        {
135            return Err(invalid_input_error(
136                "pledged content size does not match bytes consumed",
137            ));
138        }
139
140        self.ensure_frame_started()?;
141
142        if self.pending.is_empty() {
143            self.write_empty_last_block()
144                .map_err(|err| self.fail(err))?;
145        } else {
146            self.emit_pending_block(true)?;
147        }
148
149        let mut drain = self
150            .drain
151            .take()
152            .expect("streaming encoder drain must be present when finishing");
153
154        #[cfg(feature = "hash")]
155        {
156            let checksum = self.hasher.finish() as u32;
157            drain
158                .write_all(&checksum.to_le_bytes())
159                .map_err(|err| self.fail(err))?;
160        }
161
162        drain.flush().map_err(|err| self.fail(err))?;
163        Ok(drain)
164    }
165
166    fn ensure_open(&self) -> Result<(), Error> {
167        if self.errored {
168            return Err(self.sticky_error());
169        }
170        Ok(())
171    }
172
173    // Cold path (only reached after poisoning). The format!() calls still allocate
174    // in no_std even though error_with_kind_message/other_error_owned drop the
175    // message; this is acceptable on an error recovery path to keep match arms simple.
176    fn sticky_error(&self) -> Error {
177        match (self.last_error_kind, self.last_error_message.as_deref()) {
178            (Some(kind), Some(message)) => error_with_kind_message(
179                kind,
180                format!(
181                    "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
182                ),
183            ),
184            (Some(kind), None) => error_from_kind(kind),
185            (None, Some(message)) => other_error_owned(format!(
186                "streaming encoder is in an errored state: {message}"
187            )),
188            (None, None) => other_error("streaming encoder is in an errored state"),
189        }
190    }
191
192    fn drain_mut(&mut self) -> Result<&mut W, Error> {
193        self.drain
194            .as_mut()
195            .ok_or_else(|| other_error("streaming encoder has no active drain"))
196    }
197
198    fn ensure_frame_started(&mut self) -> Result<(), Error> {
199        if self.frame_started {
200            return Ok(());
201        }
202
203        self.ensure_level_supported()?;
204        self.state.matcher.reset(self.compression_level);
205        self.state.offset_hist = [1, 4, 8];
206        self.state.last_huff_table = None;
207        self.state.fse_tables.ll_previous = None;
208        self.state.fse_tables.ml_previous = None;
209        self.state.fse_tables.of_previous = None;
210        #[cfg(feature = "hash")]
211        {
212            self.hasher = XxHash64::with_seed(0);
213        }
214
215        let window_size = self.state.matcher.window_size();
216        if window_size == 0 {
217            return Err(invalid_input_error(
218                "matcher reported window_size == 0, which is invalid",
219            ));
220        }
221
222        let header = FrameHeader {
223            frame_content_size: self.pledged_content_size,
224            single_segment: false,
225            content_checksum: cfg!(feature = "hash"),
226            dictionary_id: None,
227            window_size: Some(window_size),
228        };
229        let mut encoded_header = Vec::new();
230        header.serialize(&mut encoded_header);
231        self.drain_mut()
232            .and_then(|drain| drain.write_all(&encoded_header))
233            .map_err(|err| self.fail(err))?;
234
235        self.frame_started = true;
236        Ok(())
237    }
238
239    fn block_capacity(&self) -> usize {
240        let matcher_window = self.state.matcher.window_size() as usize;
241        core::cmp::max(1, core::cmp::min(matcher_window, MAX_BLOCK_SIZE as usize))
242    }
243
244    fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
245        let mut space = match self.compression_level {
246            CompressionLevel::Fastest
247            | CompressionLevel::Default
248            | CompressionLevel::Better
249            | CompressionLevel::Best => self.state.matcher.get_next_space(),
250            _ => Vec::new(),
251        };
252        space.clear();
253        if space.capacity() > block_capacity {
254            space.shrink_to(block_capacity);
255        }
256        if space.capacity() < block_capacity {
257            space.reserve(block_capacity - space.capacity());
258        }
259        space
260    }
261
262    fn emit_full_pending_block(
263        &mut self,
264        block_capacity: usize,
265        consumed: usize,
266    ) -> Option<Result<usize, Error>> {
267        if self.pending.len() != block_capacity {
268            return None;
269        }
270
271        let new_pending = self.allocate_pending_space(block_capacity);
272        let full_block = mem::replace(&mut self.pending, new_pending);
273        if let Err((err, restored_block)) = self.encode_block(full_block, false) {
274            self.pending = restored_block;
275            let err = self.fail(err);
276            if consumed > 0 {
277                return Some(Ok(consumed));
278            }
279            return Some(Err(err));
280        }
281        None
282    }
283
284    fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
285        let block = mem::take(&mut self.pending);
286        if let Err((err, restored_block)) = self.encode_block(block, last_block) {
287            self.pending = restored_block;
288            return Err(self.fail(err));
289        }
290        if !last_block {
291            let block_capacity = self.block_capacity();
292            self.pending = self.allocate_pending_space(block_capacity);
293        }
294        Ok(())
295    }
296
297    // Exhaustive match kept intentionally: adding a new CompressionLevel
298    // variant will produce a compile error here, forcing the developer to
299    // decide whether the streaming encoder supports it before shipping.
300    fn ensure_level_supported(&self) -> Result<(), Error> {
301        match self.compression_level {
302            CompressionLevel::Uncompressed
303            | CompressionLevel::Fastest
304            | CompressionLevel::Default
305            | CompressionLevel::Better
306            | CompressionLevel::Best => Ok(()),
307        }
308    }
309
310    fn encode_block(
311        &mut self,
312        uncompressed_data: Vec<u8>,
313        last_block: bool,
314    ) -> Result<(), (Error, Vec<u8>)> {
315        let mut raw_block = Some(uncompressed_data);
316        // TODO: reuse scratch buffer across blocks to reduce allocation churn (#47)
317        let mut encoded = Vec::with_capacity(self.block_capacity() + 3);
318        let mut moved_into_matcher = false;
319        if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
320            let header = BlockHeader {
321                last_block,
322                block_type: crate::blocks::block::BlockType::Raw,
323                block_size: 0,
324            };
325            header.serialize(&mut encoded);
326        } else {
327            match self.compression_level {
328                CompressionLevel::Uncompressed => {
329                    let block = raw_block.as_ref().expect("raw block missing");
330                    let header = BlockHeader {
331                        last_block,
332                        block_type: crate::blocks::block::BlockType::Raw,
333                        block_size: block.len() as u32,
334                    };
335                    header.serialize(&mut encoded);
336                    encoded.extend_from_slice(block);
337                }
338                CompressionLevel::Fastest
339                | CompressionLevel::Default
340                | CompressionLevel::Better
341                | CompressionLevel::Best => {
342                    let block = raw_block.take().expect("raw block missing");
343                    debug_assert!(!block.is_empty(), "empty blocks handled above");
344                    compress_block_encoded(&mut self.state, last_block, block, &mut encoded);
345                    moved_into_matcher = true;
346                }
347            }
348        }
349
350        if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
351            let restored = if moved_into_matcher {
352                self.state.matcher.get_last_space().to_vec()
353            } else {
354                raw_block.unwrap_or_default()
355            };
356            return Err((err, restored));
357        }
358
359        if moved_into_matcher {
360            #[cfg(feature = "hash")]
361            {
362                self.hasher.write(self.state.matcher.get_last_space());
363            }
364        } else {
365            self.hash_block(raw_block.as_deref().unwrap_or(&[]));
366        }
367        Ok(())
368    }
369
370    fn write_empty_last_block(&mut self) -> Result<(), Error> {
371        self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
372    }
373
374    fn fail(&mut self, err: Error) -> Error {
375        self.errored = true;
376        if self.last_error_kind.is_none() {
377            self.last_error_kind = Some(err.kind());
378        }
379        if self.last_error_message.is_none() {
380            self.last_error_message = Some(err.to_string());
381        }
382        err
383    }
384
385    #[cfg(feature = "hash")]
386    fn hash_block(&mut self, uncompressed_data: &[u8]) {
387        self.hasher.write(uncompressed_data);
388    }
389
390    #[cfg(not(feature = "hash"))]
391    fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
392}
393
394impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
395    fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
396        self.ensure_open()?;
397        if buf.is_empty() {
398            return Ok(0);
399        }
400
401        // Check pledge before emitting the frame header so that a misuse
402        // like set_pledged_content_size(0) + write(non_empty) doesn't leave
403        // a partially-written header in the drain.
404        if let Some(pledged) = self.pledged_content_size
405            && self.bytes_consumed >= pledged
406        {
407            return Err(invalid_input_error(
408                "write would exceed pledged content size",
409            ));
410        }
411
412        self.ensure_frame_started()?;
413
414        // Enforce pledged upper bound: truncate the accepted slice to the
415        // remaining allowance so that partial-write semantics are honored
416        // (return Ok(n) with n < buf.len()) instead of failing the full call.
417        let buf = if let Some(pledged) = self.pledged_content_size {
418            let remaining_allowed = pledged
419                .checked_sub(self.bytes_consumed)
420                .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
421            if remaining_allowed == 0 {
422                return Err(invalid_input_error(
423                    "write would exceed pledged content size",
424                ));
425            }
426            let accepted = core::cmp::min(
427                buf.len(),
428                usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
429            );
430            &buf[..accepted]
431        } else {
432            buf
433        };
434
435        let block_capacity = self.block_capacity();
436        if self.pending.capacity() == 0 {
437            self.pending = self.allocate_pending_space(block_capacity);
438        }
439        let mut remaining = buf;
440        let mut consumed = 0usize;
441
442        while !remaining.is_empty() {
443            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
444                return result;
445            }
446
447            let available = block_capacity - self.pending.len();
448            let to_take = core::cmp::min(remaining.len(), available);
449            if to_take == 0 {
450                break;
451            }
452            self.pending.extend_from_slice(&remaining[..to_take]);
453            remaining = &remaining[to_take..];
454            consumed += to_take;
455
456            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
457                if let Ok(n) = &result {
458                    self.bytes_consumed += *n as u64;
459                }
460                return result;
461            }
462        }
463        self.bytes_consumed += consumed as u64;
464        Ok(consumed)
465    }
466
467    fn flush(&mut self) -> Result<(), Error> {
468        self.ensure_open()?;
469        if self.pending.is_empty() {
470            return self
471                .drain_mut()
472                .and_then(|drain| drain.flush())
473                .map_err(|err| self.fail(err));
474        }
475        self.ensure_frame_started()?;
476        self.emit_pending_block(false)?;
477        self.drain_mut()
478            .and_then(|drain| drain.flush())
479            .map_err(|err| self.fail(err))
480    }
481}
482
483fn error_from_kind(kind: ErrorKind) -> Error {
484    Error::from(kind)
485}
486
487fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
488    #[cfg(feature = "std")]
489    {
490        Error::new(kind, message)
491    }
492    #[cfg(not(feature = "std"))]
493    {
494        Error::new(kind, alloc::boxed::Box::new(message))
495    }
496}
497
498fn invalid_input_error(message: &str) -> Error {
499    #[cfg(feature = "std")]
500    {
501        Error::new(ErrorKind::InvalidInput, message)
502    }
503    #[cfg(not(feature = "std"))]
504    {
505        Error::new(
506            ErrorKind::Other,
507            alloc::boxed::Box::new(alloc::string::String::from(message)),
508        )
509    }
510}
511
512fn other_error_owned(message: String) -> Error {
513    #[cfg(feature = "std")]
514    {
515        Error::other(message)
516    }
517    #[cfg(not(feature = "std"))]
518    {
519        Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
520    }
521}
522
523fn other_error(message: &str) -> Error {
524    #[cfg(feature = "std")]
525    {
526        Error::other(message)
527    }
528    #[cfg(not(feature = "std"))]
529    {
530        Error::new(
531            ErrorKind::Other,
532            alloc::boxed::Box::new(alloc::string::String::from(message)),
533        )
534    }
535}
536
537#[cfg(test)]
538mod tests {
539    use crate::decoding::StreamingDecoder;
540    use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
541    use crate::io::{Error, ErrorKind, Read, Write};
542    use alloc::vec;
543    use alloc::vec::Vec;
544
545    struct TinyMatcher {
546        last_space: Vec<u8>,
547        window_size: u64,
548    }
549
550    impl TinyMatcher {
551        fn new(window_size: u64) -> Self {
552            Self {
553                last_space: Vec::new(),
554                window_size,
555            }
556        }
557    }
558
559    impl Matcher for TinyMatcher {
560        fn get_next_space(&mut self) -> Vec<u8> {
561            vec![0; self.window_size as usize]
562        }
563
564        fn get_last_space(&mut self) -> &[u8] {
565            self.last_space.as_slice()
566        }
567
568        fn commit_space(&mut self, space: Vec<u8>) {
569            self.last_space = space;
570        }
571
572        fn skip_matching(&mut self) {}
573
574        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
575            handle_sequence(Sequence::Literals {
576                literals: self.last_space.as_slice(),
577            });
578        }
579
580        fn reset(&mut self, _level: CompressionLevel) {
581            self.last_space.clear();
582        }
583
584        fn window_size(&self) -> u64 {
585            self.window_size
586        }
587    }
588
589    struct FailingWriteOnce {
590        writes: usize,
591        fail_on_write_number: usize,
592        sink: Vec<u8>,
593    }
594
595    impl FailingWriteOnce {
596        fn new(fail_on_write_number: usize) -> Self {
597            Self {
598                writes: 0,
599                fail_on_write_number,
600                sink: Vec::new(),
601            }
602        }
603    }
604
605    impl Write for FailingWriteOnce {
606        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
607            self.writes += 1;
608            if self.writes == self.fail_on_write_number {
609                return Err(super::other_error("injected write failure"));
610            }
611            self.sink.extend_from_slice(buf);
612            Ok(buf.len())
613        }
614
615        fn flush(&mut self) -> Result<(), Error> {
616            Ok(())
617        }
618    }
619
620    struct FailingWithKind {
621        writes: usize,
622        fail_on_write_number: usize,
623        kind: ErrorKind,
624    }
625
626    impl FailingWithKind {
627        fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
628            Self {
629                writes: 0,
630                fail_on_write_number,
631                kind,
632            }
633        }
634    }
635
636    impl Write for FailingWithKind {
637        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
638            self.writes += 1;
639            if self.writes == self.fail_on_write_number {
640                return Err(Error::from(self.kind));
641            }
642            Ok(buf.len())
643        }
644
645        fn flush(&mut self) -> Result<(), Error> {
646            Ok(())
647        }
648    }
649
650    struct PartialThenFailWriter {
651        writes: usize,
652        fail_on_write_number: usize,
653        partial_prefix_len: usize,
654        terminal_failure: bool,
655        sink: Vec<u8>,
656    }
657
658    impl PartialThenFailWriter {
659        fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
660            Self {
661                writes: 0,
662                fail_on_write_number,
663                partial_prefix_len,
664                terminal_failure: false,
665                sink: Vec::new(),
666            }
667        }
668    }
669
670    impl Write for PartialThenFailWriter {
671        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
672            if self.terminal_failure {
673                return Err(super::other_error("injected terminal write failure"));
674            }
675
676            self.writes += 1;
677            if self.writes == self.fail_on_write_number {
678                let written = core::cmp::min(self.partial_prefix_len, buf.len());
679                if written > 0 {
680                    self.sink.extend_from_slice(&buf[..written]);
681                    self.terminal_failure = true;
682                    return Ok(written);
683                }
684                return Err(super::other_error("injected terminal write failure"));
685            }
686
687            self.sink.extend_from_slice(buf);
688            Ok(buf.len())
689        }
690
691        fn flush(&mut self) -> Result<(), Error> {
692            Ok(())
693        }
694    }
695
696    #[test]
697    fn streaming_encoder_roundtrip_multiple_writes() {
698        let payload = b"streaming-encoder-roundtrip-".repeat(1024);
699        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
700        for chunk in payload.chunks(313) {
701            encoder.write_all(chunk).unwrap();
702        }
703        let compressed = encoder.finish().unwrap();
704
705        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
706        let mut decoded = Vec::new();
707        decoder.read_to_end(&mut decoded).unwrap();
708        assert_eq!(decoded, payload);
709    }
710
711    #[test]
712    fn flush_emits_nonempty_partial_output() {
713        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
714        encoder.write_all(b"partial-block").unwrap();
715        encoder.flush().unwrap();
716        let flushed_len = encoder.get_ref().len();
717        assert!(
718            flushed_len > 0,
719            "flush should emit header+partial block bytes"
720        );
721        let compressed = encoder.finish().unwrap();
722        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
723        let mut decoded = Vec::new();
724        decoder.read_to_end(&mut decoded).unwrap();
725        assert_eq!(decoded, b"partial-block");
726    }
727
728    #[test]
729    fn flush_without_writes_does_not_emit_frame_header() {
730        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
731        encoder.flush().unwrap();
732        assert!(encoder.get_ref().is_empty());
733    }
734
735    #[test]
736    fn block_boundary_write_emits_block_in_same_call() {
737        let mut boundary = StreamingEncoder::new_with_matcher(
738            TinyMatcher::new(4),
739            Vec::new(),
740            CompressionLevel::Uncompressed,
741        );
742        let mut below = StreamingEncoder::new_with_matcher(
743            TinyMatcher::new(4),
744            Vec::new(),
745            CompressionLevel::Uncompressed,
746        );
747
748        boundary.write_all(b"ABCD").unwrap();
749        below.write_all(b"ABC").unwrap();
750
751        let boundary_len = boundary.get_ref().len();
752        let below_len = below.get_ref().len();
753        assert!(
754            boundary_len > below_len,
755            "full block should be emitted immediately at block boundary"
756        );
757    }
758
759    #[test]
760    fn finish_consumes_encoder_and_emits_frame() {
761        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
762        encoder.write_all(b"abc").unwrap();
763        let compressed = encoder.finish().unwrap();
764        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
765        let mut decoded = Vec::new();
766        decoder.read_to_end(&mut decoded).unwrap();
767        assert_eq!(decoded, b"abc");
768    }
769
770    #[test]
771    fn finish_without_writes_emits_empty_frame() {
772        let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
773        let compressed = encoder.finish().unwrap();
774        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
775        let mut decoded = Vec::new();
776        decoder.read_to_end(&mut decoded).unwrap();
777        assert!(decoded.is_empty());
778    }
779
780    #[test]
781    fn write_empty_buffer_returns_zero() {
782        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
783        assert_eq!(encoder.write(&[]).unwrap(), 0);
784        let _ = encoder.finish().unwrap();
785    }
786
787    #[test]
788    fn uncompressed_level_roundtrip() {
789        let payload = b"uncompressed-streaming-roundtrip".repeat(64);
790        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
791        for chunk in payload.chunks(41) {
792            encoder.write_all(chunk).unwrap();
793        }
794        let compressed = encoder.finish().unwrap();
795        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
796        let mut decoded = Vec::new();
797        decoder.read_to_end(&mut decoded).unwrap();
798        assert_eq!(decoded, payload);
799    }
800
801    #[test]
802    fn better_level_streaming_roundtrip() {
803        let payload = b"better-level-streaming-test".repeat(256);
804        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
805        for chunk in payload.chunks(53) {
806            encoder.write_all(chunk).unwrap();
807        }
808        let compressed = encoder.finish().unwrap();
809        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
810        let mut decoded = Vec::new();
811        decoder.read_to_end(&mut decoded).unwrap();
812        assert_eq!(decoded, payload);
813    }
814
815    #[test]
816    fn zero_window_matcher_returns_invalid_input_error() {
817        let mut encoder = StreamingEncoder::new_with_matcher(
818            TinyMatcher::new(0),
819            Vec::new(),
820            CompressionLevel::Fastest,
821        );
822        let err = encoder.write_all(b"payload").unwrap_err();
823        assert_eq!(err.kind(), ErrorKind::InvalidInput);
824    }
825
826    #[test]
827    fn best_level_streaming_roundtrip() {
828        // 200 KiB payload crosses the 128 KiB block boundary, exercising
829        // multi-block emission and matcher state carry-over for Best.
830        let payload = b"best-level-streaming-test".repeat(8 * 1024);
831        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
832        for chunk in payload.chunks(53) {
833            encoder.write_all(chunk).unwrap();
834        }
835        let compressed = encoder.finish().unwrap();
836        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
837        let mut decoded = Vec::new();
838        decoder.read_to_end(&mut decoded).unwrap();
839        assert_eq!(decoded, payload);
840    }
841
842    #[test]
843    fn write_failure_poisoning_is_sticky() {
844        let mut encoder = StreamingEncoder::new_with_matcher(
845            TinyMatcher::new(4),
846            FailingWriteOnce::new(1),
847            CompressionLevel::Uncompressed,
848        );
849
850        assert!(encoder.write_all(b"ABCD").is_err());
851        assert!(encoder.flush().is_err());
852        assert!(encoder.write_all(b"EFGH").is_err());
853        assert_eq!(encoder.get_ref().sink.len(), 0);
854        assert!(encoder.finish().is_err());
855    }
856
857    #[test]
858    fn poisoned_encoder_returns_original_error_kind() {
859        let mut encoder = StreamingEncoder::new_with_matcher(
860            TinyMatcher::new(4),
861            FailingWithKind::new(1, ErrorKind::BrokenPipe),
862            CompressionLevel::Uncompressed,
863        );
864
865        let first_error = encoder.write_all(b"ABCD").unwrap_err();
866        assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
867
868        let second_error = encoder.write_all(b"EFGH").unwrap_err();
869        assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
870    }
871
872    #[test]
873    fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
874        let payload = b"ABCDEFGHIJKL";
875        let mut encoder = StreamingEncoder::new_with_matcher(
876            TinyMatcher::new(4),
877            FailingWriteOnce::new(3),
878            CompressionLevel::Uncompressed,
879        );
880
881        let first_write = encoder.write(payload).unwrap();
882        assert_eq!(first_write, 8);
883        assert!(encoder.write(&payload[first_write..]).is_err());
884        assert!(encoder.flush().is_err());
885        assert!(encoder.write_all(b"EFGH").is_err());
886    }
887
888    #[test]
889    fn partial_write_failure_after_progress_poisons_encoder() {
890        let payload = b"ABCDEFGHIJKL";
891        let mut encoder = StreamingEncoder::new_with_matcher(
892            TinyMatcher::new(4),
893            PartialThenFailWriter::new(3, 1),
894            CompressionLevel::Uncompressed,
895        );
896
897        let first_write = encoder.write(payload).unwrap();
898        assert_eq!(first_write, 8);
899
900        let second_write = encoder.write(&payload[first_write..]);
901        assert!(second_write.is_err());
902        assert!(encoder.flush().is_err());
903        assert!(encoder.write_all(b"MNOP").is_err());
904    }
905
906    #[test]
907    fn new_with_matcher_and_get_mut_work() {
908        let matcher = TinyMatcher::new(128 * 1024);
909        let mut encoder =
910            StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
911        encoder.get_mut().extend_from_slice(b"");
912        encoder.write_all(b"custom-matcher").unwrap();
913        let compressed = encoder.finish().unwrap();
914        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
915        let mut decoded = Vec::new();
916        decoder.read_to_end(&mut decoded).unwrap();
917        assert_eq!(decoded, b"custom-matcher");
918    }
919
920    #[cfg(feature = "std")]
921    #[test]
922    fn streaming_encoder_output_decompresses_with_c_zstd() {
923        let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
924        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
925        for chunk in payload.chunks(1024) {
926            encoder.write_all(chunk).unwrap();
927        }
928        let compressed = encoder.finish().unwrap();
929
930        let mut decoded = Vec::with_capacity(payload.len());
931        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
932        assert_eq!(decoded, payload);
933    }
934
935    #[test]
936    fn pledged_content_size_written_in_header() {
937        let payload = b"hello world, pledged size test";
938        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
939        encoder
940            .set_pledged_content_size(payload.len() as u64)
941            .unwrap();
942        encoder.write_all(payload).unwrap();
943        let compressed = encoder.finish().unwrap();
944
945        // Verify FCS is present and correct
946        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
947            .unwrap()
948            .0;
949        assert_eq!(header.frame_content_size(), payload.len() as u64);
950
951        // Verify roundtrip
952        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
953        let mut decoded = Vec::new();
954        decoder.read_to_end(&mut decoded).unwrap();
955        assert_eq!(decoded, payload);
956    }
957
958    #[test]
959    fn pledged_content_size_mismatch_returns_error() {
960        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
961        encoder.set_pledged_content_size(100).unwrap();
962        encoder.write_all(b"short payload").unwrap(); // 13 bytes != 100 pledged
963        let err = encoder.finish().unwrap_err();
964        assert_eq!(err.kind(), ErrorKind::InvalidInput);
965    }
966
967    #[test]
968    fn write_exceeding_pledge_returns_error() {
969        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
970        encoder.set_pledged_content_size(5).unwrap();
971        let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
972        assert_eq!(err.kind(), ErrorKind::InvalidInput);
973    }
974
975    #[test]
976    fn write_straddling_pledge_reports_partial_progress() {
977        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
978        encoder.set_pledged_content_size(5).unwrap();
979        // write() should accept exactly 5 bytes (partial progress)
980        assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
981        // Next write should fail — pledge exhausted
982        let err = encoder.write(b"g").unwrap_err();
983        assert_eq!(err.kind(), ErrorKind::InvalidInput);
984    }
985
986    #[test]
987    fn pledged_content_size_after_write_returns_error() {
988        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
989        encoder.write_all(b"already writing").unwrap();
990        let err = encoder.set_pledged_content_size(15).unwrap_err();
991        assert_eq!(err.kind(), ErrorKind::InvalidInput);
992    }
993
994    #[cfg(feature = "std")]
995    #[test]
996    fn pledged_content_size_c_zstd_compatible() {
997        let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
998        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
999        encoder
1000            .set_pledged_content_size(payload.len() as u64)
1001            .unwrap();
1002        for chunk in payload.chunks(1024) {
1003            encoder.write_all(chunk).unwrap();
1004        }
1005        let compressed = encoder.finish().unwrap();
1006
1007        // FCS should be written
1008        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1009            .unwrap()
1010            .0;
1011        assert_eq!(header.frame_content_size(), payload.len() as u64);
1012
1013        // C zstd should decompress successfully
1014        let mut decoded = Vec::new();
1015        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1016        assert_eq!(decoded, payload);
1017    }
1018
1019    #[test]
1020    fn no_pledged_size_omits_fcs_from_header() {
1021        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1022        encoder.write_all(b"no pledged size").unwrap();
1023        let compressed = encoder.finish().unwrap();
1024
1025        // FCS should be omitted from the header; the decoder reports absent FCS as 0.
1026        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1027            .unwrap()
1028            .0;
1029        assert_eq!(header.frame_content_size(), 0);
1030        // Verify the descriptor confirms FCS field is truly absent (0 bytes),
1031        // not just FCS present with value 0.
1032        assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1033    }
1034}