Skip to main content

structured_zstd/encoding/
streaming_encoder.rs

1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14    CompressionLevel, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15    frame_compressor::CompressState, frame_compressor::FseTables, frame_header::FrameHeader,
16};
17use crate::io::{Error, ErrorKind, Write};
18
19/// Incremental frame encoder that implements [`Write`].
20///
21/// Data can be provided with multiple `write()` calls. Full blocks are compressed
22/// automatically, `flush()` emits the currently buffered partial block as non-last,
23/// and `finish()` closes the frame and returns the wrapped writer.
24pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
25    drain: Option<W>,
26    compression_level: CompressionLevel,
27    state: CompressState<M>,
28    pending: Vec<u8>,
29    encoded_scratch: Vec<u8>,
30    errored: bool,
31    last_error_kind: Option<ErrorKind>,
32    last_error_message: Option<String>,
33    frame_started: bool,
34    pledged_content_size: Option<u64>,
35    bytes_consumed: u64,
36    #[cfg(feature = "hash")]
37    hasher: XxHash64,
38}
39
40impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
41    /// Creates a streaming encoder backed by the default match generator.
42    ///
43    /// The encoder writes compressed bytes into `drain` and applies `compression_level`
44    /// to all subsequently written blocks.
45    pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
46        Self::new_with_matcher(
47            MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
48            drain,
49            compression_level,
50        )
51    }
52}
53
54impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
55    /// Creates a streaming encoder with an explicitly provided matcher implementation.
56    ///
57    /// This constructor is primarily intended for tests and advanced callers that need
58    /// custom match-window behavior.
59    pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
60        Self {
61            drain: Some(drain),
62            compression_level,
63            state: CompressState {
64                matcher,
65                last_huff_table: None,
66                fse_tables: FseTables::new(),
67                offset_hist: [1, 4, 8],
68            },
69            pending: Vec::new(),
70            encoded_scratch: Vec::new(),
71            errored: false,
72            last_error_kind: None,
73            last_error_message: None,
74            frame_started: false,
75            pledged_content_size: None,
76            bytes_consumed: 0,
77            #[cfg(feature = "hash")]
78            hasher: XxHash64::with_seed(0),
79        }
80    }
81
82    /// Pledge the total uncompressed content size for this frame.
83    ///
84    /// When set, the frame header will include a `Frame_Content_Size` field.
85    /// This enables decoders to pre-allocate output buffers.
86    /// The pledged size is also forwarded as a source-size hint to the
87    /// matcher so small inputs can use smaller matching tables.
88    ///
89    /// Must be called **before** the first [`write`](Write::write) call;
90    /// calling it after the frame header has already been emitted returns an
91    /// error.
92    pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
93        self.ensure_open()?;
94        if self.frame_started {
95            return Err(invalid_input_error(
96                "pledged content size must be set before the first write",
97            ));
98        }
99        self.pledged_content_size = Some(size);
100        // Also use pledged size as source-size hint so the matcher
101        // can select smaller tables for small inputs.
102        self.state.matcher.set_source_size_hint(size);
103        Ok(())
104    }
105
106    /// Provide a hint about the total uncompressed size for the next frame.
107    ///
108    /// Unlike [`set_pledged_content_size`](Self::set_pledged_content_size),
109    /// this does **not** enforce that exactly `size` bytes are written; it
110    /// may reduce matcher tables, advertised frame window, and block sizing
111    /// for small inputs. Must be called before the first
112    /// [`write`](Write::write).
113    pub fn set_source_size_hint(&mut self, size: u64) -> Result<(), Error> {
114        self.ensure_open()?;
115        if self.frame_started {
116            return Err(invalid_input_error(
117                "source size hint must be set before the first write",
118            ));
119        }
120        self.state.matcher.set_source_size_hint(size);
121        Ok(())
122    }
123
124    /// Returns an immutable reference to the wrapped output drain.
125    ///
126    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
127    /// consumes the encoder and returns ownership of the drain.
128    pub fn get_ref(&self) -> &W {
129        self.drain
130            .as_ref()
131            .expect("streaming encoder drain is present until finish consumes self")
132    }
133
134    /// Returns a mutable reference to the wrapped output drain.
135    ///
136    /// It is inadvisable to directly write to the underlying writer, as doing
137    /// so would corrupt the zstd frame being assembled by the encoder.
138    ///
139    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
140    /// consumes the encoder and returns ownership of the drain.
141    pub fn get_mut(&mut self) -> &mut W {
142        self.drain
143            .as_mut()
144            .expect("streaming encoder drain is present until finish consumes self")
145    }
146
147    /// Finalizes the current zstd frame and returns the wrapped output drain.
148    ///
149    /// If no payload was written yet, this still emits a valid empty frame.
150    /// Calling this method consumes the encoder.
151    pub fn finish(mut self) -> Result<W, Error> {
152        self.ensure_open()?;
153
154        // Validate the pledge before finalizing the frame. If finish() is
155        // called before any writes, this also avoids emitting a header with
156        // an incorrect FCS into the drain on mismatch.
157        if let Some(pledged) = self.pledged_content_size
158            && self.bytes_consumed != pledged
159        {
160            return Err(invalid_input_error(
161                "pledged content size does not match bytes consumed",
162            ));
163        }
164
165        self.ensure_frame_started()?;
166
167        if self.pending.is_empty() {
168            self.write_empty_last_block()
169                .map_err(|err| self.fail(err))?;
170        } else {
171            self.emit_pending_block(true)?;
172        }
173
174        let mut drain = self
175            .drain
176            .take()
177            .expect("streaming encoder drain must be present when finishing");
178
179        #[cfg(feature = "hash")]
180        {
181            let checksum = self.hasher.finish() as u32;
182            drain
183                .write_all(&checksum.to_le_bytes())
184                .map_err(|err| self.fail(err))?;
185        }
186
187        drain.flush().map_err(|err| self.fail(err))?;
188        Ok(drain)
189    }
190
191    fn ensure_open(&self) -> Result<(), Error> {
192        if self.errored {
193            return Err(self.sticky_error());
194        }
195        Ok(())
196    }
197
198    // Cold path (only reached after poisoning). The format!() calls still allocate
199    // in no_std even though error_with_kind_message/other_error_owned drop the
200    // message; this is acceptable on an error recovery path to keep match arms simple.
201    fn sticky_error(&self) -> Error {
202        match (self.last_error_kind, self.last_error_message.as_deref()) {
203            (Some(kind), Some(message)) => error_with_kind_message(
204                kind,
205                format!(
206                    "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
207                ),
208            ),
209            (Some(kind), None) => error_from_kind(kind),
210            (None, Some(message)) => other_error_owned(format!(
211                "streaming encoder is in an errored state: {message}"
212            )),
213            (None, None) => other_error("streaming encoder is in an errored state"),
214        }
215    }
216
217    fn drain_mut(&mut self) -> Result<&mut W, Error> {
218        self.drain
219            .as_mut()
220            .ok_or_else(|| other_error("streaming encoder has no active drain"))
221    }
222
223    fn ensure_frame_started(&mut self) -> Result<(), Error> {
224        if self.frame_started {
225            return Ok(());
226        }
227
228        self.ensure_level_supported()?;
229        self.state.matcher.reset(self.compression_level);
230        self.state.offset_hist = [1, 4, 8];
231        self.state.last_huff_table = None;
232        self.state.fse_tables.ll_previous = None;
233        self.state.fse_tables.ml_previous = None;
234        self.state.fse_tables.of_previous = None;
235        #[cfg(feature = "hash")]
236        {
237            self.hasher = XxHash64::with_seed(0);
238        }
239
240        let window_size = self.state.matcher.window_size();
241        if window_size == 0 {
242            return Err(invalid_input_error(
243                "matcher reported window_size == 0, which is invalid",
244            ));
245        }
246
247        // FrameCompressor gates single-segment on dictionary usage state; the
248        // streaming encoder currently has no dictionary API/state, so we only
249        // gate on pledged size and window reach here.
250        // TODO: if streaming dictionary support is added, mirror the
251        // !use_dictionary_state guard from FrameCompressor.
252        let single_segment = self
253            .pledged_content_size
254            .map(|size| (512..=(1 << 14)).contains(&size) && size <= window_size)
255            .unwrap_or(false);
256
257        let header = FrameHeader {
258            frame_content_size: self.pledged_content_size,
259            single_segment,
260            content_checksum: cfg!(feature = "hash"),
261            dictionary_id: None,
262            window_size: if single_segment {
263                None
264            } else {
265                Some(window_size)
266            },
267        };
268        let mut encoded_header = Vec::new();
269        header.serialize(&mut encoded_header);
270        self.drain_mut()
271            .and_then(|drain| drain.write_all(&encoded_header))
272            .map_err(|err| self.fail(err))?;
273
274        self.frame_started = true;
275        Ok(())
276    }
277
278    fn block_capacity(&self) -> usize {
279        let matcher_window = self.state.matcher.window_size() as usize;
280        core::cmp::max(1, core::cmp::min(matcher_window, MAX_BLOCK_SIZE as usize))
281    }
282
283    fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
284        let mut space = match self.compression_level {
285            CompressionLevel::Fastest
286            | CompressionLevel::Default
287            | CompressionLevel::Better
288            | CompressionLevel::Best
289            | CompressionLevel::Level(_) => self.state.matcher.get_next_space(),
290            CompressionLevel::Uncompressed => Vec::new(),
291        };
292        space.clear();
293        if space.capacity() > block_capacity {
294            space.shrink_to(block_capacity);
295        }
296        if space.capacity() < block_capacity {
297            space.reserve(block_capacity - space.capacity());
298        }
299        space
300    }
301
302    fn emit_full_pending_block(
303        &mut self,
304        block_capacity: usize,
305        consumed: usize,
306    ) -> Option<Result<usize, Error>> {
307        if self.pending.len() != block_capacity {
308            return None;
309        }
310
311        let new_pending = self.allocate_pending_space(block_capacity);
312        let full_block = mem::replace(&mut self.pending, new_pending);
313        if let Err((err, restored_block)) = self.encode_block(full_block, false) {
314            self.pending = restored_block;
315            let err = self.fail(err);
316            if consumed > 0 {
317                return Some(Ok(consumed));
318            }
319            return Some(Err(err));
320        }
321        None
322    }
323
324    fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
325        let block = mem::take(&mut self.pending);
326        if let Err((err, restored_block)) = self.encode_block(block, last_block) {
327            self.pending = restored_block;
328            return Err(self.fail(err));
329        }
330        if !last_block {
331            let block_capacity = self.block_capacity();
332            self.pending = self.allocate_pending_space(block_capacity);
333        }
334        Ok(())
335    }
336
337    // Exhaustive match kept intentionally: adding a new CompressionLevel
338    // variant will produce a compile error here, forcing the developer to
339    // decide whether the streaming encoder supports it before shipping.
340    fn ensure_level_supported(&self) -> Result<(), Error> {
341        match self.compression_level {
342            CompressionLevel::Uncompressed
343            | CompressionLevel::Fastest
344            | CompressionLevel::Default
345            | CompressionLevel::Better
346            | CompressionLevel::Best
347            | CompressionLevel::Level(_) => Ok(()),
348        }
349    }
350
351    fn encode_block(
352        &mut self,
353        uncompressed_data: Vec<u8>,
354        last_block: bool,
355    ) -> Result<(), (Error, Vec<u8>)> {
356        let mut raw_block = Some(uncompressed_data);
357        let mut encoded = Vec::new();
358        mem::swap(&mut encoded, &mut self.encoded_scratch);
359        encoded.clear();
360        let needed_capacity = self.block_capacity() + 3;
361        if encoded.capacity() < needed_capacity {
362            encoded.reserve(needed_capacity.saturating_sub(encoded.len()));
363        }
364        let mut moved_into_matcher = false;
365        if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
366            let header = BlockHeader {
367                last_block,
368                block_type: crate::blocks::block::BlockType::Raw,
369                block_size: 0,
370            };
371            header.serialize(&mut encoded);
372        } else {
373            match self.compression_level {
374                CompressionLevel::Uncompressed => {
375                    let block = raw_block.as_ref().expect("raw block missing");
376                    let header = BlockHeader {
377                        last_block,
378                        block_type: crate::blocks::block::BlockType::Raw,
379                        block_size: block.len() as u32,
380                    };
381                    header.serialize(&mut encoded);
382                    encoded.extend_from_slice(block);
383                }
384                CompressionLevel::Fastest
385                | CompressionLevel::Default
386                | CompressionLevel::Better
387                | CompressionLevel::Best
388                | CompressionLevel::Level(_) => {
389                    let block = raw_block.take().expect("raw block missing");
390                    debug_assert!(!block.is_empty(), "empty blocks handled above");
391                    compress_block_encoded(
392                        &mut self.state,
393                        self.compression_level,
394                        last_block,
395                        block,
396                        &mut encoded,
397                    );
398                    moved_into_matcher = true;
399                }
400            }
401        }
402
403        if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
404            encoded.clear();
405            mem::swap(&mut encoded, &mut self.encoded_scratch);
406            let restored = if moved_into_matcher {
407                self.state.matcher.get_last_space().to_vec()
408            } else {
409                raw_block.unwrap_or_default()
410            };
411            return Err((err, restored));
412        }
413
414        if moved_into_matcher {
415            #[cfg(feature = "hash")]
416            {
417                self.hasher.write(self.state.matcher.get_last_space());
418            }
419        } else {
420            self.hash_block(raw_block.as_deref().unwrap_or(&[]));
421        }
422        encoded.clear();
423        mem::swap(&mut encoded, &mut self.encoded_scratch);
424        Ok(())
425    }
426
427    fn write_empty_last_block(&mut self) -> Result<(), Error> {
428        self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
429    }
430
431    fn fail(&mut self, err: Error) -> Error {
432        self.errored = true;
433        if self.last_error_kind.is_none() {
434            self.last_error_kind = Some(err.kind());
435        }
436        if self.last_error_message.is_none() {
437            self.last_error_message = Some(err.to_string());
438        }
439        err
440    }
441
442    #[cfg(feature = "hash")]
443    fn hash_block(&mut self, uncompressed_data: &[u8]) {
444        self.hasher.write(uncompressed_data);
445    }
446
447    #[cfg(not(feature = "hash"))]
448    fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
449}
450
451impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
452    fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
453        self.ensure_open()?;
454        if buf.is_empty() {
455            return Ok(0);
456        }
457
458        // Check pledge before emitting the frame header so that a misuse
459        // like set_pledged_content_size(0) + write(non_empty) doesn't leave
460        // a partially-written header in the drain.
461        if let Some(pledged) = self.pledged_content_size
462            && self.bytes_consumed >= pledged
463        {
464            return Err(invalid_input_error(
465                "write would exceed pledged content size",
466            ));
467        }
468
469        self.ensure_frame_started()?;
470
471        // Enforce pledged upper bound: truncate the accepted slice to the
472        // remaining allowance so that partial-write semantics are honored
473        // (return Ok(n) with n < buf.len()) instead of failing the full call.
474        let buf = if let Some(pledged) = self.pledged_content_size {
475            let remaining_allowed = pledged
476                .checked_sub(self.bytes_consumed)
477                .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
478            if remaining_allowed == 0 {
479                return Err(invalid_input_error(
480                    "write would exceed pledged content size",
481                ));
482            }
483            let accepted = core::cmp::min(
484                buf.len(),
485                usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
486            );
487            &buf[..accepted]
488        } else {
489            buf
490        };
491
492        let block_capacity = self.block_capacity();
493        if self.pending.capacity() == 0 {
494            self.pending = self.allocate_pending_space(block_capacity);
495        }
496        let mut remaining = buf;
497        let mut consumed = 0usize;
498
499        while !remaining.is_empty() {
500            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
501                return result;
502            }
503
504            let available = block_capacity - self.pending.len();
505            let to_take = core::cmp::min(remaining.len(), available);
506            if to_take == 0 {
507                break;
508            }
509            self.pending.extend_from_slice(&remaining[..to_take]);
510            remaining = &remaining[to_take..];
511            consumed += to_take;
512
513            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
514                if let Ok(n) = &result {
515                    self.bytes_consumed += *n as u64;
516                }
517                return result;
518            }
519        }
520        self.bytes_consumed += consumed as u64;
521        Ok(consumed)
522    }
523
524    fn flush(&mut self) -> Result<(), Error> {
525        self.ensure_open()?;
526        if self.pending.is_empty() {
527            return self
528                .drain_mut()
529                .and_then(|drain| drain.flush())
530                .map_err(|err| self.fail(err));
531        }
532        self.ensure_frame_started()?;
533        self.emit_pending_block(false)?;
534        self.drain_mut()
535            .and_then(|drain| drain.flush())
536            .map_err(|err| self.fail(err))
537    }
538}
539
540fn error_from_kind(kind: ErrorKind) -> Error {
541    Error::from(kind)
542}
543
544fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
545    #[cfg(feature = "std")]
546    {
547        Error::new(kind, message)
548    }
549    #[cfg(not(feature = "std"))]
550    {
551        Error::new(kind, alloc::boxed::Box::new(message))
552    }
553}
554
555fn invalid_input_error(message: &str) -> Error {
556    #[cfg(feature = "std")]
557    {
558        Error::new(ErrorKind::InvalidInput, message)
559    }
560    #[cfg(not(feature = "std"))]
561    {
562        Error::new(
563            ErrorKind::Other,
564            alloc::boxed::Box::new(alloc::string::String::from(message)),
565        )
566    }
567}
568
569fn other_error_owned(message: String) -> Error {
570    #[cfg(feature = "std")]
571    {
572        Error::other(message)
573    }
574    #[cfg(not(feature = "std"))]
575    {
576        Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
577    }
578}
579
580fn other_error(message: &str) -> Error {
581    #[cfg(feature = "std")]
582    {
583        Error::other(message)
584    }
585    #[cfg(not(feature = "std"))]
586    {
587        Error::new(
588            ErrorKind::Other,
589            alloc::boxed::Box::new(alloc::string::String::from(message)),
590        )
591    }
592}
593
594#[cfg(test)]
595mod tests {
596    use crate::decoding::StreamingDecoder;
597    use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
598    use crate::io::{Error, ErrorKind, Read, Write};
599    use alloc::vec;
600    use alloc::vec::Vec;
601
602    struct TinyMatcher {
603        last_space: Vec<u8>,
604        window_size: u64,
605    }
606
607    impl TinyMatcher {
608        fn new(window_size: u64) -> Self {
609            Self {
610                last_space: Vec::new(),
611                window_size,
612            }
613        }
614    }
615
616    impl Matcher for TinyMatcher {
617        fn get_next_space(&mut self) -> Vec<u8> {
618            vec![0; self.window_size as usize]
619        }
620
621        fn get_last_space(&mut self) -> &[u8] {
622            self.last_space.as_slice()
623        }
624
625        fn commit_space(&mut self, space: Vec<u8>) {
626            self.last_space = space;
627        }
628
629        fn skip_matching(&mut self) {}
630
631        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
632            handle_sequence(Sequence::Literals {
633                literals: self.last_space.as_slice(),
634            });
635        }
636
637        fn reset(&mut self, _level: CompressionLevel) {
638            self.last_space.clear();
639        }
640
641        fn window_size(&self) -> u64 {
642            self.window_size
643        }
644    }
645
646    struct FailingWriteOnce {
647        writes: usize,
648        fail_on_write_number: usize,
649        sink: Vec<u8>,
650    }
651
652    impl FailingWriteOnce {
653        fn new(fail_on_write_number: usize) -> Self {
654            Self {
655                writes: 0,
656                fail_on_write_number,
657                sink: Vec::new(),
658            }
659        }
660    }
661
662    impl Write for FailingWriteOnce {
663        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
664            self.writes += 1;
665            if self.writes == self.fail_on_write_number {
666                return Err(super::other_error("injected write failure"));
667            }
668            self.sink.extend_from_slice(buf);
669            Ok(buf.len())
670        }
671
672        fn flush(&mut self) -> Result<(), Error> {
673            Ok(())
674        }
675    }
676
677    struct FailingWithKind {
678        writes: usize,
679        fail_on_write_number: usize,
680        kind: ErrorKind,
681    }
682
683    impl FailingWithKind {
684        fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
685            Self {
686                writes: 0,
687                fail_on_write_number,
688                kind,
689            }
690        }
691    }
692
693    impl Write for FailingWithKind {
694        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
695            self.writes += 1;
696            if self.writes == self.fail_on_write_number {
697                return Err(Error::from(self.kind));
698            }
699            Ok(buf.len())
700        }
701
702        fn flush(&mut self) -> Result<(), Error> {
703            Ok(())
704        }
705    }
706
707    struct PartialThenFailWriter {
708        writes: usize,
709        fail_on_write_number: usize,
710        partial_prefix_len: usize,
711        terminal_failure: bool,
712        sink: Vec<u8>,
713    }
714
715    impl PartialThenFailWriter {
716        fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
717            Self {
718                writes: 0,
719                fail_on_write_number,
720                partial_prefix_len,
721                terminal_failure: false,
722                sink: Vec::new(),
723            }
724        }
725    }
726
727    impl Write for PartialThenFailWriter {
728        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
729            if self.terminal_failure {
730                return Err(super::other_error("injected terminal write failure"));
731            }
732
733            self.writes += 1;
734            if self.writes == self.fail_on_write_number {
735                let written = core::cmp::min(self.partial_prefix_len, buf.len());
736                if written > 0 {
737                    self.sink.extend_from_slice(&buf[..written]);
738                    self.terminal_failure = true;
739                    return Ok(written);
740                }
741                return Err(super::other_error("injected terminal write failure"));
742            }
743
744            self.sink.extend_from_slice(buf);
745            Ok(buf.len())
746        }
747
748        fn flush(&mut self) -> Result<(), Error> {
749            Ok(())
750        }
751    }
752
753    #[test]
754    fn streaming_encoder_roundtrip_multiple_writes() {
755        let payload = b"streaming-encoder-roundtrip-".repeat(1024);
756        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
757        for chunk in payload.chunks(313) {
758            encoder.write_all(chunk).unwrap();
759        }
760        let compressed = encoder.finish().unwrap();
761
762        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
763        let mut decoded = Vec::new();
764        decoder.read_to_end(&mut decoded).unwrap();
765        assert_eq!(decoded, payload);
766    }
767
768    #[test]
769    fn flush_emits_nonempty_partial_output() {
770        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
771        encoder.write_all(b"partial-block").unwrap();
772        encoder.flush().unwrap();
773        let flushed_len = encoder.get_ref().len();
774        assert!(
775            flushed_len > 0,
776            "flush should emit header+partial block bytes"
777        );
778        let compressed = encoder.finish().unwrap();
779        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
780        let mut decoded = Vec::new();
781        decoder.read_to_end(&mut decoded).unwrap();
782        assert_eq!(decoded, b"partial-block");
783    }
784
785    #[test]
786    fn flush_without_writes_does_not_emit_frame_header() {
787        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
788        encoder.flush().unwrap();
789        assert!(encoder.get_ref().is_empty());
790    }
791
792    #[test]
793    fn block_boundary_write_emits_block_in_same_call() {
794        let mut boundary = StreamingEncoder::new_with_matcher(
795            TinyMatcher::new(4),
796            Vec::new(),
797            CompressionLevel::Uncompressed,
798        );
799        let mut below = StreamingEncoder::new_with_matcher(
800            TinyMatcher::new(4),
801            Vec::new(),
802            CompressionLevel::Uncompressed,
803        );
804
805        boundary.write_all(b"ABCD").unwrap();
806        below.write_all(b"ABC").unwrap();
807
808        let boundary_len = boundary.get_ref().len();
809        let below_len = below.get_ref().len();
810        assert!(
811            boundary_len > below_len,
812            "full block should be emitted immediately at block boundary"
813        );
814    }
815
816    #[test]
817    fn finish_consumes_encoder_and_emits_frame() {
818        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
819        encoder.write_all(b"abc").unwrap();
820        let compressed = encoder.finish().unwrap();
821        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
822        let mut decoded = Vec::new();
823        decoder.read_to_end(&mut decoded).unwrap();
824        assert_eq!(decoded, b"abc");
825    }
826
827    #[test]
828    fn finish_without_writes_emits_empty_frame() {
829        let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
830        let compressed = encoder.finish().unwrap();
831        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
832        let mut decoded = Vec::new();
833        decoder.read_to_end(&mut decoded).unwrap();
834        assert!(decoded.is_empty());
835    }
836
837    #[test]
838    fn write_empty_buffer_returns_zero() {
839        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
840        assert_eq!(encoder.write(&[]).unwrap(), 0);
841        let _ = encoder.finish().unwrap();
842    }
843
844    #[test]
845    fn uncompressed_level_roundtrip() {
846        let payload = b"uncompressed-streaming-roundtrip".repeat(64);
847        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
848        for chunk in payload.chunks(41) {
849            encoder.write_all(chunk).unwrap();
850        }
851        let compressed = encoder.finish().unwrap();
852        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
853        let mut decoded = Vec::new();
854        decoder.read_to_end(&mut decoded).unwrap();
855        assert_eq!(decoded, payload);
856    }
857
858    #[test]
859    fn better_level_streaming_roundtrip() {
860        let payload = b"better-level-streaming-test".repeat(256);
861        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
862        for chunk in payload.chunks(53) {
863            encoder.write_all(chunk).unwrap();
864        }
865        let compressed = encoder.finish().unwrap();
866        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
867        let mut decoded = Vec::new();
868        decoder.read_to_end(&mut decoded).unwrap();
869        assert_eq!(decoded, payload);
870    }
871
872    #[test]
873    fn zero_window_matcher_returns_invalid_input_error() {
874        let mut encoder = StreamingEncoder::new_with_matcher(
875            TinyMatcher::new(0),
876            Vec::new(),
877            CompressionLevel::Fastest,
878        );
879        let err = encoder.write_all(b"payload").unwrap_err();
880        assert_eq!(err.kind(), ErrorKind::InvalidInput);
881    }
882
883    #[test]
884    fn best_level_streaming_roundtrip() {
885        // 200 KiB payload crosses the 128 KiB block boundary, exercising
886        // multi-block emission and matcher state carry-over for Best.
887        let payload = b"best-level-streaming-test".repeat(8 * 1024);
888        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
889        for chunk in payload.chunks(53) {
890            encoder.write_all(chunk).unwrap();
891        }
892        let compressed = encoder.finish().unwrap();
893        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
894        let mut decoded = Vec::new();
895        decoder.read_to_end(&mut decoded).unwrap();
896        assert_eq!(decoded, payload);
897    }
898
899    #[test]
900    fn write_failure_poisoning_is_sticky() {
901        let mut encoder = StreamingEncoder::new_with_matcher(
902            TinyMatcher::new(4),
903            FailingWriteOnce::new(1),
904            CompressionLevel::Uncompressed,
905        );
906
907        assert!(encoder.write_all(b"ABCD").is_err());
908        assert!(encoder.flush().is_err());
909        assert!(encoder.write_all(b"EFGH").is_err());
910        assert_eq!(encoder.get_ref().sink.len(), 0);
911        assert!(encoder.finish().is_err());
912    }
913
914    #[test]
915    fn poisoned_encoder_returns_original_error_kind() {
916        let mut encoder = StreamingEncoder::new_with_matcher(
917            TinyMatcher::new(4),
918            FailingWithKind::new(1, ErrorKind::BrokenPipe),
919            CompressionLevel::Uncompressed,
920        );
921
922        let first_error = encoder.write_all(b"ABCD").unwrap_err();
923        assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
924
925        let second_error = encoder.write_all(b"EFGH").unwrap_err();
926        assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
927    }
928
929    #[test]
930    fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
931        let payload = b"ABCDEFGHIJKL";
932        let mut encoder = StreamingEncoder::new_with_matcher(
933            TinyMatcher::new(4),
934            FailingWriteOnce::new(3),
935            CompressionLevel::Uncompressed,
936        );
937
938        let first_write = encoder.write(payload).unwrap();
939        assert_eq!(first_write, 8);
940        assert!(encoder.write(&payload[first_write..]).is_err());
941        assert!(encoder.flush().is_err());
942        assert!(encoder.write_all(b"EFGH").is_err());
943    }
944
945    #[test]
946    fn partial_write_failure_after_progress_poisons_encoder() {
947        let payload = b"ABCDEFGHIJKL";
948        let mut encoder = StreamingEncoder::new_with_matcher(
949            TinyMatcher::new(4),
950            PartialThenFailWriter::new(3, 1),
951            CompressionLevel::Uncompressed,
952        );
953
954        let first_write = encoder.write(payload).unwrap();
955        assert_eq!(first_write, 8);
956
957        let second_write = encoder.write(&payload[first_write..]);
958        assert!(second_write.is_err());
959        assert!(encoder.flush().is_err());
960        assert!(encoder.write_all(b"MNOP").is_err());
961    }
962
963    #[test]
964    fn new_with_matcher_and_get_mut_work() {
965        let matcher = TinyMatcher::new(128 * 1024);
966        let mut encoder =
967            StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
968        encoder.get_mut().extend_from_slice(b"");
969        encoder.write_all(b"custom-matcher").unwrap();
970        let compressed = encoder.finish().unwrap();
971        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
972        let mut decoded = Vec::new();
973        decoder.read_to_end(&mut decoded).unwrap();
974        assert_eq!(decoded, b"custom-matcher");
975    }
976
977    #[cfg(feature = "std")]
978    #[test]
979    fn streaming_encoder_output_decompresses_with_c_zstd() {
980        let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
981        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
982        for chunk in payload.chunks(1024) {
983            encoder.write_all(chunk).unwrap();
984        }
985        let compressed = encoder.finish().unwrap();
986
987        let mut decoded = Vec::with_capacity(payload.len());
988        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
989        assert_eq!(decoded, payload);
990    }
991
992    #[test]
993    fn pledged_content_size_written_in_header() {
994        let payload = b"hello world, pledged size test";
995        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
996        encoder
997            .set_pledged_content_size(payload.len() as u64)
998            .unwrap();
999        encoder.write_all(payload).unwrap();
1000        let compressed = encoder.finish().unwrap();
1001
1002        // Verify FCS is present and correct
1003        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1004            .unwrap()
1005            .0;
1006        assert_eq!(header.frame_content_size(), payload.len() as u64);
1007
1008        // Verify roundtrip
1009        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1010        let mut decoded = Vec::new();
1011        decoder.read_to_end(&mut decoded).unwrap();
1012        assert_eq!(decoded, payload);
1013    }
1014
1015    #[test]
1016    fn pledged_content_size_mismatch_returns_error() {
1017        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1018        encoder.set_pledged_content_size(100).unwrap();
1019        encoder.write_all(b"short payload").unwrap(); // 13 bytes != 100 pledged
1020        let err = encoder.finish().unwrap_err();
1021        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1022    }
1023
1024    #[test]
1025    fn write_exceeding_pledge_returns_error() {
1026        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1027        encoder.set_pledged_content_size(5).unwrap();
1028        let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
1029        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1030    }
1031
1032    #[test]
1033    fn write_straddling_pledge_reports_partial_progress() {
1034        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1035        encoder.set_pledged_content_size(5).unwrap();
1036        // write() should accept exactly 5 bytes (partial progress)
1037        assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
1038        // Next write should fail — pledge exhausted
1039        let err = encoder.write(b"g").unwrap_err();
1040        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1041    }
1042
1043    #[test]
1044    fn encoded_scratch_capacity_is_reused_across_blocks() {
1045        let payload = vec![0xAB; 64 * 3];
1046        let mut encoder = StreamingEncoder::new_with_matcher(
1047            TinyMatcher::new(64),
1048            Vec::new(),
1049            CompressionLevel::Uncompressed,
1050        );
1051
1052        encoder.write_all(&payload[..64]).unwrap();
1053        let first_capacity = encoder.encoded_scratch.capacity();
1054        assert!(
1055            first_capacity >= 67,
1056            "expected encoded scratch to keep block header + payload capacity",
1057        );
1058
1059        encoder.write_all(&payload[64..128]).unwrap();
1060        let second_capacity = encoder.encoded_scratch.capacity();
1061        assert!(
1062            second_capacity >= first_capacity,
1063            "encoded scratch capacity should be reused across block emits",
1064        );
1065
1066        encoder.write_all(&payload[128..]).unwrap();
1067        let compressed = encoder.finish().unwrap();
1068        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1069        let mut decoded = Vec::new();
1070        decoder.read_to_end(&mut decoded).unwrap();
1071        assert_eq!(decoded, payload);
1072    }
1073
1074    #[test]
1075    fn pledged_content_size_after_write_returns_error() {
1076        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1077        encoder.write_all(b"already writing").unwrap();
1078        let err = encoder.set_pledged_content_size(15).unwrap_err();
1079        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1080    }
1081
1082    #[test]
1083    fn source_size_hint_directly_reduces_window_header() {
1084        let payload = b"streaming-source-size-hint".repeat(64);
1085
1086        let mut no_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1087        no_hint.write_all(payload.as_slice()).unwrap();
1088        let no_hint_frame = no_hint.finish().unwrap();
1089        let no_hint_header = crate::decoding::frame::read_frame_header(no_hint_frame.as_slice())
1090            .unwrap()
1091            .0;
1092        let no_hint_window = no_hint_header.window_size().unwrap();
1093
1094        let mut with_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1095        with_hint
1096            .set_source_size_hint(payload.len() as u64)
1097            .unwrap();
1098        with_hint.write_all(payload.as_slice()).unwrap();
1099        let late_hint_err = with_hint
1100            .set_source_size_hint(payload.len() as u64)
1101            .unwrap_err();
1102        assert_eq!(late_hint_err.kind(), ErrorKind::InvalidInput);
1103        let with_hint_frame = with_hint.finish().unwrap();
1104        let with_hint_header =
1105            crate::decoding::frame::read_frame_header(with_hint_frame.as_slice())
1106                .unwrap()
1107                .0;
1108        let with_hint_window = with_hint_header.window_size().unwrap();
1109
1110        assert!(
1111            with_hint_window <= no_hint_window,
1112            "source size hint should not increase advertised window"
1113        );
1114
1115        let mut decoder = StreamingDecoder::new(with_hint_frame.as_slice()).unwrap();
1116        let mut decoded = Vec::new();
1117        decoder.read_to_end(&mut decoded).unwrap();
1118        assert_eq!(decoded, payload);
1119    }
1120
1121    #[cfg(feature = "std")]
1122    #[test]
1123    fn pledged_content_size_c_zstd_compatible() {
1124        let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
1125        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1126        encoder
1127            .set_pledged_content_size(payload.len() as u64)
1128            .unwrap();
1129        for chunk in payload.chunks(1024) {
1130            encoder.write_all(chunk).unwrap();
1131        }
1132        let compressed = encoder.finish().unwrap();
1133
1134        // FCS should be written
1135        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1136            .unwrap()
1137            .0;
1138        assert_eq!(header.frame_content_size(), payload.len() as u64);
1139
1140        // C zstd should decompress successfully
1141        let mut decoded = Vec::new();
1142        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1143        assert_eq!(decoded, payload);
1144    }
1145
1146    #[test]
1147    fn single_segment_requires_pledged_to_fit_matcher_window() {
1148        let payload = b"streaming-window-gate-".repeat(60); // 1320 bytes
1149        let mut encoder = StreamingEncoder::new_with_matcher(
1150            TinyMatcher::new(1024),
1151            Vec::new(),
1152            CompressionLevel::Fastest,
1153        );
1154        encoder
1155            .set_pledged_content_size(payload.len() as u64)
1156            .unwrap();
1157        encoder.write_all(payload.as_slice()).unwrap();
1158        let compressed = encoder.finish().unwrap();
1159
1160        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1161            .unwrap()
1162            .0;
1163        assert_eq!(header.frame_content_size(), payload.len() as u64);
1164        assert!(
1165            !header.descriptor.single_segment_flag(),
1166            "single-segment must stay off when pledged content size exceeds matcher window"
1167        );
1168        assert!(
1169            header.window_size().unwrap() >= 1024,
1170            "window descriptor should be present when single-segment is disabled"
1171        );
1172    }
1173
1174    #[test]
1175    fn no_pledged_size_omits_fcs_from_header() {
1176        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1177        encoder.write_all(b"no pledged size").unwrap();
1178        let compressed = encoder.finish().unwrap();
1179
1180        // FCS should be omitted from the header; the decoder reports absent FCS as 0.
1181        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1182            .unwrap()
1183            .0;
1184        assert_eq!(header.frame_content_size(), 0);
1185        // Verify the descriptor confirms FCS field is truly absent (0 bytes),
1186        // not just FCS present with value 0.
1187        assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1188    }
1189}