Skip to main content

structured_zstd/encoding/
streaming_encoder.rs

1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14    CompressionLevel, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15    frame_compressor::CompressState, frame_compressor::FseTables, frame_header::FrameHeader,
16};
17use crate::io::{Error, ErrorKind, Write};
18
19/// Incremental frame encoder that implements [`Write`].
20///
21/// Data can be provided with multiple `write()` calls. Full blocks are compressed
22/// automatically, `flush()` emits the currently buffered partial block as non-last,
23/// and `finish()` closes the frame and returns the wrapped writer.
24pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
25    drain: Option<W>,
26    compression_level: CompressionLevel,
27    state: CompressState<M>,
28    pending: Vec<u8>,
29    encoded_scratch: Vec<u8>,
30    errored: bool,
31    last_error_kind: Option<ErrorKind>,
32    last_error_message: Option<String>,
33    frame_started: bool,
34    pledged_content_size: Option<u64>,
35    bytes_consumed: u64,
36    #[cfg(feature = "hash")]
37    hasher: XxHash64,
38}
39
40impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
41    /// Creates a streaming encoder backed by the default match generator.
42    ///
43    /// The encoder writes compressed bytes into `drain` and applies `compression_level`
44    /// to all subsequently written blocks.
45    pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
46        Self::new_with_matcher(
47            MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
48            drain,
49            compression_level,
50        )
51    }
52}
53
54impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
55    /// Creates a streaming encoder with an explicitly provided matcher implementation.
56    ///
57    /// This constructor is primarily intended for tests and advanced callers that need
58    /// custom match-window behavior.
59    pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
60        Self {
61            drain: Some(drain),
62            compression_level,
63            state: CompressState {
64                matcher,
65                last_huff_table: None,
66                fse_tables: FseTables::new(),
67                block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
68                offset_hist: [1, 4, 8],
69            },
70            pending: Vec::new(),
71            encoded_scratch: Vec::new(),
72            errored: false,
73            last_error_kind: None,
74            last_error_message: None,
75            frame_started: false,
76            pledged_content_size: None,
77            bytes_consumed: 0,
78            #[cfg(feature = "hash")]
79            hasher: XxHash64::with_seed(0),
80        }
81    }
82
83    /// Pledge the total uncompressed content size for this frame.
84    ///
85    /// When set, the frame header will include a `Frame_Content_Size` field.
86    /// This enables decoders to pre-allocate output buffers.
87    /// The pledged size is also forwarded as a source-size hint to the
88    /// matcher so small inputs can use smaller matching tables.
89    ///
90    /// Must be called **before** the first [`write`](Write::write) call;
91    /// calling it after the frame header has already been emitted returns an
92    /// error.
93    pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
94        self.ensure_open()?;
95        if self.frame_started {
96            return Err(invalid_input_error(
97                "pledged content size must be set before the first write",
98            ));
99        }
100        self.pledged_content_size = Some(size);
101        // Also use pledged size as source-size hint so the matcher
102        // can select smaller tables for small inputs.
103        self.state.matcher.set_source_size_hint(size);
104        Ok(())
105    }
106
107    /// Provide a hint about the total uncompressed size for the next frame.
108    ///
109    /// Unlike [`set_pledged_content_size`](Self::set_pledged_content_size),
110    /// this does **not** enforce that exactly `size` bytes are written; it
111    /// may reduce matcher tables, advertised frame window, and block sizing
112    /// for small inputs. Must be called before the first
113    /// [`write`](Write::write).
114    pub fn set_source_size_hint(&mut self, size: u64) -> Result<(), Error> {
115        self.ensure_open()?;
116        if self.frame_started {
117            return Err(invalid_input_error(
118                "source size hint must be set before the first write",
119            ));
120        }
121        self.state.matcher.set_source_size_hint(size);
122        Ok(())
123    }
124
125    /// Returns an immutable reference to the wrapped output drain.
126    ///
127    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
128    /// consumes the encoder and returns ownership of the drain.
129    pub fn get_ref(&self) -> &W {
130        self.drain
131            .as_ref()
132            .expect("streaming encoder drain is present until finish consumes self")
133    }
134
135    /// Returns a mutable reference to the wrapped output drain.
136    ///
137    /// It is inadvisable to directly write to the underlying writer, as doing
138    /// so would corrupt the zstd frame being assembled by the encoder.
139    ///
140    /// The drain remains available for the encoder lifetime; [`finish`](Self::finish)
141    /// consumes the encoder and returns ownership of the drain.
142    pub fn get_mut(&mut self) -> &mut W {
143        self.drain
144            .as_mut()
145            .expect("streaming encoder drain is present until finish consumes self")
146    }
147
148    /// Finalizes the current zstd frame and returns the wrapped output drain.
149    ///
150    /// If no payload was written yet, this still emits a valid empty frame.
151    /// Calling this method consumes the encoder.
152    pub fn finish(mut self) -> Result<W, Error> {
153        self.ensure_open()?;
154
155        // Validate the pledge before finalizing the frame. If finish() is
156        // called before any writes, this also avoids emitting a header with
157        // an incorrect FCS into the drain on mismatch.
158        if let Some(pledged) = self.pledged_content_size
159            && self.bytes_consumed != pledged
160        {
161            return Err(invalid_input_error(
162                "pledged content size does not match bytes consumed",
163            ));
164        }
165
166        self.ensure_frame_started()?;
167
168        if self.pending.is_empty() {
169            self.write_empty_last_block()
170                .map_err(|err| self.fail(err))?;
171        } else {
172            self.emit_pending_block(true)?;
173        }
174
175        let mut drain = self
176            .drain
177            .take()
178            .expect("streaming encoder drain must be present when finishing");
179
180        #[cfg(feature = "hash")]
181        {
182            let checksum = self.hasher.finish() as u32;
183            drain
184                .write_all(&checksum.to_le_bytes())
185                .map_err(|err| self.fail(err))?;
186        }
187
188        drain.flush().map_err(|err| self.fail(err))?;
189        Ok(drain)
190    }
191
192    fn ensure_open(&self) -> Result<(), Error> {
193        if self.errored {
194            return Err(self.sticky_error());
195        }
196        Ok(())
197    }
198
199    // Cold path (only reached after poisoning). The format!() calls still allocate
200    // in no_std even though error_with_kind_message/other_error_owned drop the
201    // message; this is acceptable on an error recovery path to keep match arms simple.
202    fn sticky_error(&self) -> Error {
203        match (self.last_error_kind, self.last_error_message.as_deref()) {
204            (Some(kind), Some(message)) => error_with_kind_message(
205                kind,
206                format!(
207                    "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
208                ),
209            ),
210            (Some(kind), None) => error_from_kind(kind),
211            (None, Some(message)) => other_error_owned(format!(
212                "streaming encoder is in an errored state: {message}"
213            )),
214            (None, None) => other_error("streaming encoder is in an errored state"),
215        }
216    }
217
218    fn drain_mut(&mut self) -> Result<&mut W, Error> {
219        self.drain
220            .as_mut()
221            .ok_or_else(|| other_error("streaming encoder has no active drain"))
222    }
223
224    fn ensure_frame_started(&mut self) -> Result<(), Error> {
225        if self.frame_started {
226            return Ok(());
227        }
228
229        self.ensure_level_supported()?;
230        self.state.matcher.reset(self.compression_level);
231        self.state.offset_hist = [1, 4, 8];
232        self.state.last_huff_table = None;
233        self.state.fse_tables.ll_previous = None;
234        self.state.fse_tables.ml_previous = None;
235        self.state.fse_tables.of_previous = None;
236        #[cfg(feature = "hash")]
237        {
238            self.hasher = XxHash64::with_seed(0);
239        }
240
241        let window_size = self.state.matcher.window_size();
242        if window_size == 0 {
243            return Err(invalid_input_error(
244                "matcher reported window_size == 0, which is invalid",
245            ));
246        }
247
248        // FrameCompressor gates single-segment on dictionary usage state; the
249        // streaming encoder currently has no dictionary API/state, so we only
250        // gate on pledged size and window reach here.
251        // TODO: if streaming dictionary support is added, mirror the
252        // !use_dictionary_state guard from FrameCompressor.
253        let single_segment = self
254            .pledged_content_size
255            .map(|size| (512..=(1 << 14)).contains(&size) && size <= window_size)
256            .unwrap_or(false);
257
258        let header = FrameHeader {
259            frame_content_size: self.pledged_content_size,
260            single_segment,
261            content_checksum: cfg!(feature = "hash"),
262            dictionary_id: None,
263            window_size: if single_segment {
264                None
265            } else {
266                Some(window_size)
267            },
268        };
269        let mut encoded_header = Vec::new();
270        header.serialize(&mut encoded_header);
271        self.drain_mut()
272            .and_then(|drain| drain.write_all(&encoded_header))
273            .map_err(|err| self.fail(err))?;
274
275        self.frame_started = true;
276        Ok(())
277    }
278
279    fn block_capacity(&self) -> usize {
280        let matcher_window = self.state.matcher.window_size() as usize;
281        core::cmp::max(1, core::cmp::min(matcher_window, MAX_BLOCK_SIZE as usize))
282    }
283
284    fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
285        let mut space = match self.compression_level {
286            CompressionLevel::Fastest
287            | CompressionLevel::Default
288            | CompressionLevel::Better
289            | CompressionLevel::Best
290            | CompressionLevel::Level(_) => self.state.matcher.get_next_space(),
291            CompressionLevel::Uncompressed => Vec::new(),
292        };
293        space.clear();
294        if space.capacity() > block_capacity {
295            space.shrink_to(block_capacity);
296        }
297        if space.capacity() < block_capacity {
298            space.reserve(block_capacity - space.capacity());
299        }
300        space
301    }
302
303    fn emit_full_pending_block(
304        &mut self,
305        block_capacity: usize,
306        consumed: usize,
307    ) -> Option<Result<usize, Error>> {
308        if self.pending.len() != block_capacity {
309            return None;
310        }
311
312        let new_pending = self.allocate_pending_space(block_capacity);
313        let full_block = mem::replace(&mut self.pending, new_pending);
314        if let Err((err, restored_block)) = self.encode_block(full_block, false) {
315            self.pending = restored_block;
316            let err = self.fail(err);
317            if consumed > 0 {
318                return Some(Ok(consumed));
319            }
320            return Some(Err(err));
321        }
322        None
323    }
324
325    fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
326        let block = mem::take(&mut self.pending);
327        if let Err((err, restored_block)) = self.encode_block(block, last_block) {
328            self.pending = restored_block;
329            return Err(self.fail(err));
330        }
331        if !last_block {
332            let block_capacity = self.block_capacity();
333            self.pending = self.allocate_pending_space(block_capacity);
334        }
335        Ok(())
336    }
337
338    // Exhaustive match kept intentionally: adding a new CompressionLevel
339    // variant will produce a compile error here, forcing the developer to
340    // decide whether the streaming encoder supports it before shipping.
341    fn ensure_level_supported(&self) -> Result<(), Error> {
342        match self.compression_level {
343            CompressionLevel::Uncompressed
344            | CompressionLevel::Fastest
345            | CompressionLevel::Default
346            | CompressionLevel::Better
347            | CompressionLevel::Best
348            | CompressionLevel::Level(_) => Ok(()),
349        }
350    }
351
352    fn encode_block(
353        &mut self,
354        uncompressed_data: Vec<u8>,
355        last_block: bool,
356    ) -> Result<(), (Error, Vec<u8>)> {
357        let mut raw_block = Some(uncompressed_data);
358        let mut encoded = Vec::new();
359        mem::swap(&mut encoded, &mut self.encoded_scratch);
360        encoded.clear();
361        let needed_capacity = self.block_capacity() + 3;
362        if encoded.capacity() < needed_capacity {
363            encoded.reserve(needed_capacity.saturating_sub(encoded.len()));
364        }
365        let mut moved_into_matcher = false;
366        if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
367            let header = BlockHeader {
368                last_block,
369                block_type: crate::blocks::block::BlockType::Raw,
370                block_size: 0,
371            };
372            header.serialize(&mut encoded);
373        } else {
374            match self.compression_level {
375                CompressionLevel::Uncompressed => {
376                    let block = raw_block.as_ref().expect("raw block missing");
377                    let header = BlockHeader {
378                        last_block,
379                        block_type: crate::blocks::block::BlockType::Raw,
380                        block_size: block.len() as u32,
381                    };
382                    header.serialize(&mut encoded);
383                    encoded.extend_from_slice(block);
384                }
385                CompressionLevel::Fastest
386                | CompressionLevel::Default
387                | CompressionLevel::Better
388                | CompressionLevel::Best
389                | CompressionLevel::Level(_) => {
390                    let block = raw_block.take().expect("raw block missing");
391                    debug_assert!(!block.is_empty(), "empty blocks handled above");
392                    compress_block_encoded(
393                        &mut self.state,
394                        self.compression_level,
395                        last_block,
396                        block,
397                        &mut encoded,
398                    );
399                    moved_into_matcher = true;
400                }
401            }
402        }
403
404        if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
405            encoded.clear();
406            mem::swap(&mut encoded, &mut self.encoded_scratch);
407            let restored = if moved_into_matcher {
408                self.state.matcher.get_last_space().to_vec()
409            } else {
410                raw_block.unwrap_or_default()
411            };
412            return Err((err, restored));
413        }
414
415        if moved_into_matcher {
416            #[cfg(feature = "hash")]
417            {
418                self.hasher.write(self.state.matcher.get_last_space());
419            }
420        } else {
421            self.hash_block(raw_block.as_deref().unwrap_or(&[]));
422        }
423        encoded.clear();
424        mem::swap(&mut encoded, &mut self.encoded_scratch);
425        Ok(())
426    }
427
428    fn write_empty_last_block(&mut self) -> Result<(), Error> {
429        self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
430    }
431
432    fn fail(&mut self, err: Error) -> Error {
433        self.errored = true;
434        if self.last_error_kind.is_none() {
435            self.last_error_kind = Some(err.kind());
436        }
437        if self.last_error_message.is_none() {
438            self.last_error_message = Some(err.to_string());
439        }
440        err
441    }
442
443    #[cfg(feature = "hash")]
444    fn hash_block(&mut self, uncompressed_data: &[u8]) {
445        self.hasher.write(uncompressed_data);
446    }
447
448    #[cfg(not(feature = "hash"))]
449    fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
450}
451
452impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
453    fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
454        self.ensure_open()?;
455        if buf.is_empty() {
456            return Ok(0);
457        }
458
459        // Check pledge before emitting the frame header so that a misuse
460        // like set_pledged_content_size(0) + write(non_empty) doesn't leave
461        // a partially-written header in the drain.
462        if let Some(pledged) = self.pledged_content_size
463            && self.bytes_consumed >= pledged
464        {
465            return Err(invalid_input_error(
466                "write would exceed pledged content size",
467            ));
468        }
469
470        self.ensure_frame_started()?;
471
472        // Enforce pledged upper bound: truncate the accepted slice to the
473        // remaining allowance so that partial-write semantics are honored
474        // (return Ok(n) with n < buf.len()) instead of failing the full call.
475        let buf = if let Some(pledged) = self.pledged_content_size {
476            let remaining_allowed = pledged
477                .checked_sub(self.bytes_consumed)
478                .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
479            if remaining_allowed == 0 {
480                return Err(invalid_input_error(
481                    "write would exceed pledged content size",
482                ));
483            }
484            let accepted = core::cmp::min(
485                buf.len(),
486                usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
487            );
488            &buf[..accepted]
489        } else {
490            buf
491        };
492
493        let block_capacity = self.block_capacity();
494        if self.pending.capacity() == 0 {
495            self.pending = self.allocate_pending_space(block_capacity);
496        }
497        let mut remaining = buf;
498        let mut consumed = 0usize;
499
500        while !remaining.is_empty() {
501            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
502                return result;
503            }
504
505            let available = block_capacity - self.pending.len();
506            let to_take = core::cmp::min(remaining.len(), available);
507            if to_take == 0 {
508                break;
509            }
510            self.pending.extend_from_slice(&remaining[..to_take]);
511            remaining = &remaining[to_take..];
512            consumed += to_take;
513
514            if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
515                if let Ok(n) = &result {
516                    self.bytes_consumed += *n as u64;
517                }
518                return result;
519            }
520        }
521        self.bytes_consumed += consumed as u64;
522        Ok(consumed)
523    }
524
525    fn flush(&mut self) -> Result<(), Error> {
526        self.ensure_open()?;
527        if self.pending.is_empty() {
528            return self
529                .drain_mut()
530                .and_then(|drain| drain.flush())
531                .map_err(|err| self.fail(err));
532        }
533        self.ensure_frame_started()?;
534        self.emit_pending_block(false)?;
535        self.drain_mut()
536            .and_then(|drain| drain.flush())
537            .map_err(|err| self.fail(err))
538    }
539}
540
541fn error_from_kind(kind: ErrorKind) -> Error {
542    Error::from(kind)
543}
544
545fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
546    #[cfg(feature = "std")]
547    {
548        Error::new(kind, message)
549    }
550    #[cfg(not(feature = "std"))]
551    {
552        Error::new(kind, alloc::boxed::Box::new(message))
553    }
554}
555
556fn invalid_input_error(message: &str) -> Error {
557    #[cfg(feature = "std")]
558    {
559        Error::new(ErrorKind::InvalidInput, message)
560    }
561    #[cfg(not(feature = "std"))]
562    {
563        Error::new(
564            ErrorKind::Other,
565            alloc::boxed::Box::new(alloc::string::String::from(message)),
566        )
567    }
568}
569
570fn other_error_owned(message: String) -> Error {
571    #[cfg(feature = "std")]
572    {
573        Error::other(message)
574    }
575    #[cfg(not(feature = "std"))]
576    {
577        Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
578    }
579}
580
581fn other_error(message: &str) -> Error {
582    #[cfg(feature = "std")]
583    {
584        Error::other(message)
585    }
586    #[cfg(not(feature = "std"))]
587    {
588        Error::new(
589            ErrorKind::Other,
590            alloc::boxed::Box::new(alloc::string::String::from(message)),
591        )
592    }
593}
594
595#[cfg(test)]
596mod tests {
597    use crate::decoding::StreamingDecoder;
598    use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
599    use crate::io::{Error, ErrorKind, Read, Write};
600    use alloc::vec;
601    use alloc::vec::Vec;
602
603    struct TinyMatcher {
604        last_space: Vec<u8>,
605        window_size: u64,
606    }
607
608    impl TinyMatcher {
609        fn new(window_size: u64) -> Self {
610            Self {
611                last_space: Vec::new(),
612                window_size,
613            }
614        }
615    }
616
617    impl Matcher for TinyMatcher {
618        fn get_next_space(&mut self) -> Vec<u8> {
619            vec![0; self.window_size as usize]
620        }
621
622        fn get_last_space(&mut self) -> &[u8] {
623            self.last_space.as_slice()
624        }
625
626        fn commit_space(&mut self, space: Vec<u8>) {
627            self.last_space = space;
628        }
629
630        fn skip_matching(&mut self) {}
631
632        fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
633            handle_sequence(Sequence::Literals {
634                literals: self.last_space.as_slice(),
635            });
636        }
637
638        fn reset(&mut self, _level: CompressionLevel) {
639            self.last_space.clear();
640        }
641
642        fn window_size(&self) -> u64 {
643            self.window_size
644        }
645    }
646
647    struct FailingWriteOnce {
648        writes: usize,
649        fail_on_write_number: usize,
650        sink: Vec<u8>,
651    }
652
653    impl FailingWriteOnce {
654        fn new(fail_on_write_number: usize) -> Self {
655            Self {
656                writes: 0,
657                fail_on_write_number,
658                sink: Vec::new(),
659            }
660        }
661    }
662
663    impl Write for FailingWriteOnce {
664        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
665            self.writes += 1;
666            if self.writes == self.fail_on_write_number {
667                return Err(super::other_error("injected write failure"));
668            }
669            self.sink.extend_from_slice(buf);
670            Ok(buf.len())
671        }
672
673        fn flush(&mut self) -> Result<(), Error> {
674            Ok(())
675        }
676    }
677
678    struct FailingWithKind {
679        writes: usize,
680        fail_on_write_number: usize,
681        kind: ErrorKind,
682    }
683
684    impl FailingWithKind {
685        fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
686            Self {
687                writes: 0,
688                fail_on_write_number,
689                kind,
690            }
691        }
692    }
693
694    impl Write for FailingWithKind {
695        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
696            self.writes += 1;
697            if self.writes == self.fail_on_write_number {
698                return Err(Error::from(self.kind));
699            }
700            Ok(buf.len())
701        }
702
703        fn flush(&mut self) -> Result<(), Error> {
704            Ok(())
705        }
706    }
707
708    struct PartialThenFailWriter {
709        writes: usize,
710        fail_on_write_number: usize,
711        partial_prefix_len: usize,
712        terminal_failure: bool,
713        sink: Vec<u8>,
714    }
715
716    impl PartialThenFailWriter {
717        fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
718            Self {
719                writes: 0,
720                fail_on_write_number,
721                partial_prefix_len,
722                terminal_failure: false,
723                sink: Vec::new(),
724            }
725        }
726    }
727
728    impl Write for PartialThenFailWriter {
729        fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
730            if self.terminal_failure {
731                return Err(super::other_error("injected terminal write failure"));
732            }
733
734            self.writes += 1;
735            if self.writes == self.fail_on_write_number {
736                let written = core::cmp::min(self.partial_prefix_len, buf.len());
737                if written > 0 {
738                    self.sink.extend_from_slice(&buf[..written]);
739                    self.terminal_failure = true;
740                    return Ok(written);
741                }
742                return Err(super::other_error("injected terminal write failure"));
743            }
744
745            self.sink.extend_from_slice(buf);
746            Ok(buf.len())
747        }
748
749        fn flush(&mut self) -> Result<(), Error> {
750            Ok(())
751        }
752    }
753
754    #[test]
755    fn streaming_encoder_roundtrip_multiple_writes() {
756        let payload = b"streaming-encoder-roundtrip-".repeat(1024);
757        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
758        for chunk in payload.chunks(313) {
759            encoder.write_all(chunk).unwrap();
760        }
761        let compressed = encoder.finish().unwrap();
762
763        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
764        let mut decoded = Vec::new();
765        decoder.read_to_end(&mut decoded).unwrap();
766        assert_eq!(decoded, payload);
767    }
768
769    #[test]
770    fn flush_emits_nonempty_partial_output() {
771        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
772        encoder.write_all(b"partial-block").unwrap();
773        encoder.flush().unwrap();
774        let flushed_len = encoder.get_ref().len();
775        assert!(
776            flushed_len > 0,
777            "flush should emit header+partial block bytes"
778        );
779        let compressed = encoder.finish().unwrap();
780        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
781        let mut decoded = Vec::new();
782        decoder.read_to_end(&mut decoded).unwrap();
783        assert_eq!(decoded, b"partial-block");
784    }
785
786    #[test]
787    fn flush_without_writes_does_not_emit_frame_header() {
788        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
789        encoder.flush().unwrap();
790        assert!(encoder.get_ref().is_empty());
791    }
792
793    #[test]
794    fn block_boundary_write_emits_block_in_same_call() {
795        let mut boundary = StreamingEncoder::new_with_matcher(
796            TinyMatcher::new(4),
797            Vec::new(),
798            CompressionLevel::Uncompressed,
799        );
800        let mut below = StreamingEncoder::new_with_matcher(
801            TinyMatcher::new(4),
802            Vec::new(),
803            CompressionLevel::Uncompressed,
804        );
805
806        boundary.write_all(b"ABCD").unwrap();
807        below.write_all(b"ABC").unwrap();
808
809        let boundary_len = boundary.get_ref().len();
810        let below_len = below.get_ref().len();
811        assert!(
812            boundary_len > below_len,
813            "full block should be emitted immediately at block boundary"
814        );
815    }
816
817    #[test]
818    fn finish_consumes_encoder_and_emits_frame() {
819        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
820        encoder.write_all(b"abc").unwrap();
821        let compressed = encoder.finish().unwrap();
822        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
823        let mut decoded = Vec::new();
824        decoder.read_to_end(&mut decoded).unwrap();
825        assert_eq!(decoded, b"abc");
826    }
827
828    #[test]
829    fn finish_without_writes_emits_empty_frame() {
830        let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
831        let compressed = encoder.finish().unwrap();
832        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
833        let mut decoded = Vec::new();
834        decoder.read_to_end(&mut decoded).unwrap();
835        assert!(decoded.is_empty());
836    }
837
838    #[test]
839    fn write_empty_buffer_returns_zero() {
840        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
841        assert_eq!(encoder.write(&[]).unwrap(), 0);
842        let _ = encoder.finish().unwrap();
843    }
844
845    #[test]
846    fn uncompressed_level_roundtrip() {
847        let payload = b"uncompressed-streaming-roundtrip".repeat(64);
848        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
849        for chunk in payload.chunks(41) {
850            encoder.write_all(chunk).unwrap();
851        }
852        let compressed = encoder.finish().unwrap();
853        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
854        let mut decoded = Vec::new();
855        decoder.read_to_end(&mut decoded).unwrap();
856        assert_eq!(decoded, payload);
857    }
858
859    #[test]
860    fn better_level_streaming_roundtrip() {
861        let payload = b"better-level-streaming-test".repeat(256);
862        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
863        for chunk in payload.chunks(53) {
864            encoder.write_all(chunk).unwrap();
865        }
866        let compressed = encoder.finish().unwrap();
867        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
868        let mut decoded = Vec::new();
869        decoder.read_to_end(&mut decoded).unwrap();
870        assert_eq!(decoded, payload);
871    }
872
873    #[test]
874    fn zero_window_matcher_returns_invalid_input_error() {
875        let mut encoder = StreamingEncoder::new_with_matcher(
876            TinyMatcher::new(0),
877            Vec::new(),
878            CompressionLevel::Fastest,
879        );
880        let err = encoder.write_all(b"payload").unwrap_err();
881        assert_eq!(err.kind(), ErrorKind::InvalidInput);
882    }
883
884    #[test]
885    fn best_level_streaming_roundtrip() {
886        // 200 KiB payload crosses the 128 KiB block boundary, exercising
887        // multi-block emission and matcher state carry-over for Best.
888        let payload = b"best-level-streaming-test".repeat(8 * 1024);
889        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
890        for chunk in payload.chunks(53) {
891            encoder.write_all(chunk).unwrap();
892        }
893        let compressed = encoder.finish().unwrap();
894        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
895        let mut decoded = Vec::new();
896        decoder.read_to_end(&mut decoded).unwrap();
897        assert_eq!(decoded, payload);
898    }
899
900    #[test]
901    fn write_failure_poisoning_is_sticky() {
902        let mut encoder = StreamingEncoder::new_with_matcher(
903            TinyMatcher::new(4),
904            FailingWriteOnce::new(1),
905            CompressionLevel::Uncompressed,
906        );
907
908        assert!(encoder.write_all(b"ABCD").is_err());
909        assert!(encoder.flush().is_err());
910        assert!(encoder.write_all(b"EFGH").is_err());
911        assert_eq!(encoder.get_ref().sink.len(), 0);
912        assert!(encoder.finish().is_err());
913    }
914
915    #[test]
916    fn poisoned_encoder_returns_original_error_kind() {
917        let mut encoder = StreamingEncoder::new_with_matcher(
918            TinyMatcher::new(4),
919            FailingWithKind::new(1, ErrorKind::BrokenPipe),
920            CompressionLevel::Uncompressed,
921        );
922
923        let first_error = encoder.write_all(b"ABCD").unwrap_err();
924        assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
925
926        let second_error = encoder.write_all(b"EFGH").unwrap_err();
927        assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
928    }
929
930    #[test]
931    fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
932        let payload = b"ABCDEFGHIJKL";
933        let mut encoder = StreamingEncoder::new_with_matcher(
934            TinyMatcher::new(4),
935            FailingWriteOnce::new(3),
936            CompressionLevel::Uncompressed,
937        );
938
939        let first_write = encoder.write(payload).unwrap();
940        assert_eq!(first_write, 8);
941        assert!(encoder.write(&payload[first_write..]).is_err());
942        assert!(encoder.flush().is_err());
943        assert!(encoder.write_all(b"EFGH").is_err());
944    }
945
946    #[test]
947    fn partial_write_failure_after_progress_poisons_encoder() {
948        let payload = b"ABCDEFGHIJKL";
949        let mut encoder = StreamingEncoder::new_with_matcher(
950            TinyMatcher::new(4),
951            PartialThenFailWriter::new(3, 1),
952            CompressionLevel::Uncompressed,
953        );
954
955        let first_write = encoder.write(payload).unwrap();
956        assert_eq!(first_write, 8);
957
958        let second_write = encoder.write(&payload[first_write..]);
959        assert!(second_write.is_err());
960        assert!(encoder.flush().is_err());
961        assert!(encoder.write_all(b"MNOP").is_err());
962    }
963
964    #[test]
965    fn new_with_matcher_and_get_mut_work() {
966        let matcher = TinyMatcher::new(128 * 1024);
967        let mut encoder =
968            StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
969        encoder.get_mut().extend_from_slice(b"");
970        encoder.write_all(b"custom-matcher").unwrap();
971        let compressed = encoder.finish().unwrap();
972        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
973        let mut decoded = Vec::new();
974        decoder.read_to_end(&mut decoded).unwrap();
975        assert_eq!(decoded, b"custom-matcher");
976    }
977
978    #[cfg(feature = "std")]
979    #[test]
980    fn streaming_encoder_output_decompresses_with_c_zstd() {
981        let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
982        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
983        for chunk in payload.chunks(1024) {
984            encoder.write_all(chunk).unwrap();
985        }
986        let compressed = encoder.finish().unwrap();
987
988        let mut decoded = Vec::with_capacity(payload.len());
989        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
990        assert_eq!(decoded, payload);
991    }
992
993    #[test]
994    fn pledged_content_size_written_in_header() {
995        let payload = b"hello world, pledged size test";
996        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
997        encoder
998            .set_pledged_content_size(payload.len() as u64)
999            .unwrap();
1000        encoder.write_all(payload).unwrap();
1001        let compressed = encoder.finish().unwrap();
1002
1003        // Verify FCS is present and correct
1004        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1005            .unwrap()
1006            .0;
1007        assert_eq!(header.frame_content_size(), payload.len() as u64);
1008
1009        // Verify roundtrip
1010        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1011        let mut decoded = Vec::new();
1012        decoder.read_to_end(&mut decoded).unwrap();
1013        assert_eq!(decoded, payload);
1014    }
1015
1016    #[test]
1017    fn pledged_content_size_mismatch_returns_error() {
1018        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1019        encoder.set_pledged_content_size(100).unwrap();
1020        encoder.write_all(b"short payload").unwrap(); // 13 bytes != 100 pledged
1021        let err = encoder.finish().unwrap_err();
1022        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1023    }
1024
1025    #[test]
1026    fn write_exceeding_pledge_returns_error() {
1027        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1028        encoder.set_pledged_content_size(5).unwrap();
1029        let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
1030        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1031    }
1032
1033    #[test]
1034    fn write_straddling_pledge_reports_partial_progress() {
1035        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1036        encoder.set_pledged_content_size(5).unwrap();
1037        // write() should accept exactly 5 bytes (partial progress)
1038        assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
1039        // Next write should fail — pledge exhausted
1040        let err = encoder.write(b"g").unwrap_err();
1041        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1042    }
1043
1044    #[test]
1045    fn encoded_scratch_capacity_is_reused_across_blocks() {
1046        let payload = vec![0xAB; 64 * 3];
1047        let mut encoder = StreamingEncoder::new_with_matcher(
1048            TinyMatcher::new(64),
1049            Vec::new(),
1050            CompressionLevel::Uncompressed,
1051        );
1052
1053        encoder.write_all(&payload[..64]).unwrap();
1054        let first_capacity = encoder.encoded_scratch.capacity();
1055        assert!(
1056            first_capacity >= 67,
1057            "expected encoded scratch to keep block header + payload capacity",
1058        );
1059
1060        encoder.write_all(&payload[64..128]).unwrap();
1061        let second_capacity = encoder.encoded_scratch.capacity();
1062        assert!(
1063            second_capacity >= first_capacity,
1064            "encoded scratch capacity should be reused across block emits",
1065        );
1066
1067        encoder.write_all(&payload[128..]).unwrap();
1068        let compressed = encoder.finish().unwrap();
1069        let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1070        let mut decoded = Vec::new();
1071        decoder.read_to_end(&mut decoded).unwrap();
1072        assert_eq!(decoded, payload);
1073    }
1074
1075    #[test]
1076    fn pledged_content_size_after_write_returns_error() {
1077        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1078        encoder.write_all(b"already writing").unwrap();
1079        let err = encoder.set_pledged_content_size(15).unwrap_err();
1080        assert_eq!(err.kind(), ErrorKind::InvalidInput);
1081    }
1082
1083    #[test]
1084    fn source_size_hint_directly_reduces_window_header() {
1085        let payload = b"streaming-source-size-hint".repeat(64);
1086
1087        let mut no_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1088        no_hint.write_all(payload.as_slice()).unwrap();
1089        let no_hint_frame = no_hint.finish().unwrap();
1090        let no_hint_header = crate::decoding::frame::read_frame_header(no_hint_frame.as_slice())
1091            .unwrap()
1092            .0;
1093        let no_hint_window = no_hint_header.window_size().unwrap();
1094
1095        let mut with_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1096        with_hint
1097            .set_source_size_hint(payload.len() as u64)
1098            .unwrap();
1099        with_hint.write_all(payload.as_slice()).unwrap();
1100        let late_hint_err = with_hint
1101            .set_source_size_hint(payload.len() as u64)
1102            .unwrap_err();
1103        assert_eq!(late_hint_err.kind(), ErrorKind::InvalidInput);
1104        let with_hint_frame = with_hint.finish().unwrap();
1105        let with_hint_header =
1106            crate::decoding::frame::read_frame_header(with_hint_frame.as_slice())
1107                .unwrap()
1108                .0;
1109        let with_hint_window = with_hint_header.window_size().unwrap();
1110
1111        assert!(
1112            with_hint_window <= no_hint_window,
1113            "source size hint should not increase advertised window"
1114        );
1115
1116        let mut decoder = StreamingDecoder::new(with_hint_frame.as_slice()).unwrap();
1117        let mut decoded = Vec::new();
1118        decoder.read_to_end(&mut decoded).unwrap();
1119        assert_eq!(decoded, payload);
1120    }
1121
1122    #[cfg(feature = "std")]
1123    #[test]
1124    fn pledged_content_size_c_zstd_compatible() {
1125        let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
1126        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1127        encoder
1128            .set_pledged_content_size(payload.len() as u64)
1129            .unwrap();
1130        for chunk in payload.chunks(1024) {
1131            encoder.write_all(chunk).unwrap();
1132        }
1133        let compressed = encoder.finish().unwrap();
1134
1135        // FCS should be written
1136        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1137            .unwrap()
1138            .0;
1139        assert_eq!(header.frame_content_size(), payload.len() as u64);
1140
1141        // C zstd should decompress successfully
1142        let mut decoded = Vec::new();
1143        zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1144        assert_eq!(decoded, payload);
1145    }
1146
1147    #[test]
1148    fn single_segment_requires_pledged_to_fit_matcher_window() {
1149        let payload = b"streaming-window-gate-".repeat(60); // 1320 bytes
1150        let mut encoder = StreamingEncoder::new_with_matcher(
1151            TinyMatcher::new(1024),
1152            Vec::new(),
1153            CompressionLevel::Fastest,
1154        );
1155        encoder
1156            .set_pledged_content_size(payload.len() as u64)
1157            .unwrap();
1158        encoder.write_all(payload.as_slice()).unwrap();
1159        let compressed = encoder.finish().unwrap();
1160
1161        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1162            .unwrap()
1163            .0;
1164        assert_eq!(header.frame_content_size(), payload.len() as u64);
1165        assert!(
1166            !header.descriptor.single_segment_flag(),
1167            "single-segment must stay off when pledged content size exceeds matcher window"
1168        );
1169        assert!(
1170            header.window_size().unwrap() >= 1024,
1171            "window descriptor should be present when single-segment is disabled"
1172        );
1173    }
1174
1175    #[test]
1176    fn no_pledged_size_omits_fcs_from_header() {
1177        let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1178        encoder.write_all(b"no pledged size").unwrap();
1179        let compressed = encoder.finish().unwrap();
1180
1181        // FCS should be omitted from the header; the decoder reports absent FCS as 0.
1182        let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1183            .unwrap()
1184            .0;
1185        assert_eq!(header.frame_content_size(), 0);
1186        // Verify the descriptor confirms FCS field is truly absent (0 bytes),
1187        // not just FCS present with value 0.
1188        assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1189    }
1190}