structured_zstd/decoding/streaming_decoder.rs
1//! The [StreamingDecoder] wraps a [FrameDecoder] and provides a Read impl that decodes data when necessary
2
3use core::borrow::BorrowMut;
4
5use crate::common::MAX_BLOCK_SIZE;
6use crate::decoding::errors::FrameDecoderError;
7use crate::decoding::{BlockDecodingStrategy, DictionaryHandle, FrameDecoder};
8#[cfg(not(feature = "std"))]
9use crate::io::ErrorKind;
10use crate::io::{Error, Read};
11
12/// High level Zstandard frame decoder that can be used to decompress a given Zstandard frame.
13///
14/// This decoder implements `io::Read`, so you can interact with it by calling
15/// `io::Read::read_to_end` / `io::Read::read_exact` or passing this to another library / module as a source for the decoded content
16///
17/// If you need more control over how decompression takes place, you can use
18/// the lower level [FrameDecoder], which allows for greater control over how
19/// decompression takes place but the implementor must call
20/// [FrameDecoder::decode_blocks] repeatedly to decode the entire frame.
21///
22/// ## Caveat
23/// Plain `read` / `read_exact` operate on the single frame this decoder was
24/// initialised with: they do not advance into following frames. `read_to_end`,
25/// by contrast, is specialised to consume a finite source to EOF, decoding
26/// concatenated frames and skipping skippable frames along the way.
27///
28/// To recover the bytes that follow one frame WITHOUT consuming the rest of the
29/// source, recreate the decoder manually and handle
30/// [crate::decoding::errors::ReadFrameHeaderError::SkipFrame]
31/// errors by skipping forward the `length` amount of bytes, see <https://github.com/KillingSpark/zstd-rs/issues/57>
32///
33/// ```no_run
34/// // `File` is std-only; `read_to_end` itself is available under no_std too.
35/// #[cfg(feature = "std")]
36/// {
37/// use std::fs::File;
38/// use std::io::Read;
39/// use structured_zstd::decoding::StreamingDecoder;
40///
41/// // Read a Zstandard archive from the filesystem then decompress it into a vec.
42/// let mut f: File = todo!("Read a .zstd archive from somewhere");
43/// let mut decoder = StreamingDecoder::new(f).unwrap();
44/// let mut result = Vec::new();
45/// Read::read_to_end(&mut decoder, &mut result).unwrap();
46/// }
47/// ```
48pub struct StreamingDecoder<READ: Read, DEC: BorrowMut<FrameDecoder>> {
49 pub decoder: DEC,
50 source: READ,
51 /// Dictionary the decoder was constructed with, if any. Retained so the
52 /// `read_to_end` paths can re-initialise FOLLOWING concatenated frames with
53 /// the same forced dictionary (a plain re-init resolves dictionaries by
54 /// frame id only and would lose a forced dict for frames omitting the id).
55 /// Cheap to hold: `DictionaryHandle` is an `Arc`/`Rc` handle.
56 dict: Option<DictionaryHandle>,
57}
58
59impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
60 pub fn new_with_decoder(
61 mut source: READ,
62 mut decoder: DEC,
63 ) -> Result<StreamingDecoder<READ, DEC>, FrameDecoderError> {
64 decoder.borrow_mut().init(&mut source)?;
65 Ok(StreamingDecoder {
66 decoder,
67 source,
68 dict: None,
69 })
70 }
71}
72
73impl<READ: Read> StreamingDecoder<READ, FrameDecoder> {
74 pub fn new(
75 mut source: READ,
76 ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
77 let mut decoder = FrameDecoder::new();
78 decoder.init(&mut source)?;
79 Ok(StreamingDecoder {
80 decoder,
81 source,
82 dict: None,
83 })
84 }
85
86 /// Create a streaming decoder using a pre-parsed dictionary handle.
87 ///
88 /// # Warning
89 ///
90 /// This constructor initializes the underlying [`FrameDecoder`] with
91 /// `dict`, even if a frame header omits the optional dictionary ID.
92 /// Callers must only use it when they already know the stream was encoded
93 /// with this dictionary; otherwise decoded output can be silently
94 /// corrupted.
95 pub fn new_with_dictionary_handle(
96 mut source: READ,
97 dict: &DictionaryHandle,
98 ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
99 let mut decoder = FrameDecoder::new();
100 decoder.init_with_dict_handle(&mut source, dict)?;
101 Ok(StreamingDecoder {
102 decoder,
103 source,
104 dict: Some(dict.clone()),
105 })
106 }
107
108 /// Create a streaming decoder using a serialized dictionary blob.
109 ///
110 /// # Warning
111 ///
112 /// This API forwards to [`StreamingDecoder::new_with_dictionary_handle`]
113 /// and therefore applies the decoded dictionary to frames whose headers may
114 /// omit the optional dictionary ID. Only use it when the stream is known to
115 /// be encoded with that dictionary.
116 pub fn new_with_dictionary_bytes(
117 source: READ,
118 raw_dictionary: &[u8],
119 ) -> Result<StreamingDecoder<READ, FrameDecoder>, FrameDecoderError> {
120 let dict = DictionaryHandle::decode_dict(raw_dictionary)?;
121 Self::new_with_dictionary_handle(source, &dict)
122 }
123}
124
125impl<READ: Read, DEC: BorrowMut<FrameDecoder>> StreamingDecoder<READ, DEC> {
126 /// Gets a reference to the underlying reader.
127 pub fn get_ref(&self) -> &READ {
128 &self.source
129 }
130
131 /// Gets a mutable reference to the underlying reader.
132 ///
133 /// It is inadvisable to directly read from the underlying reader.
134 pub fn get_mut(&mut self) -> &mut READ {
135 &mut self.source
136 }
137
138 /// Destructures this object into the inner reader.
139 pub fn into_inner(self) -> READ
140 where
141 READ: Sized,
142 {
143 self.source
144 }
145
146 /// Destructures this object into both the inner reader and [FrameDecoder].
147 pub fn into_parts(self) -> (READ, DEC)
148 where
149 READ: Sized,
150 {
151 (self.source, self.decoder)
152 }
153
154 /// Destructures this object into the inner [FrameDecoder].
155 pub fn into_frame_decoder(self) -> DEC {
156 self.decoder
157 }
158}
159
160impl<READ: Read, DEC: BorrowMut<FrameDecoder>> Read for StreamingDecoder<READ, DEC> {
161 fn read(&mut self, buf: &mut [u8]) -> Result<usize, Error> {
162 let decoder = self.decoder.borrow_mut();
163 if decoder.is_finished() && decoder.can_collect() == 0 {
164 // Frame fully decoded and fully drained: the running XXH64 digest
165 // is final, so a `Verify`-mode decoder validates the content
166 // checksum at this finish point. No-op in other modes.
167 #[cfg(feature = "hash")]
168 if let Err(e) = decoder.verify_content_checksum() {
169 #[cfg(feature = "std")]
170 return Err(Error::other(e));
171 #[cfg(not(feature = "std"))]
172 return Err(Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)));
173 }
174 //No more bytes can ever be decoded
175 return Ok(0);
176 }
177
178 // Interleave bounded decode with draining so the decode window
179 // (`RingBuffer`) stays near `window_size` instead of accumulating the
180 // whole request before a single end-of-call drain. `read_to_end` hands
181 // ever-larger buffers; decoding `buf.len()` worth into the ring up
182 // front grew it far past the window (repeated `reserve_amortized`
183 // alloc+copy). Decode at most one block worth per step, then drain
184 // what is now collectable into `buf`, mirroring upstream zstd's
185 // window-bounded flush loop.
186 let mut written = 0;
187 while written < buf.len() {
188 // Drain whatever is collectable now (retaining `window_size` until
189 // the frame finishes). Reclaims the ring promptly so the next
190 // decode step reuses the same capacity.
191 written += decoder.read(&mut buf[written..])?;
192 if written == buf.len() || decoder.is_finished() {
193 break;
194 }
195 // Decode one bounded chunk. `UptoBytes` may overshoot a little but
196 // is capped to one block, so the ring's live region stays within
197 // `window_size + MAX_BLOCK_SIZE`.
198 let step = (buf.len() - written).min(MAX_BLOCK_SIZE as usize);
199 if let Err(e) =
200 decoder.decode_blocks(&mut self.source, BlockDecodingStrategy::UptoBytes(step))
201 {
202 #[cfg(feature = "std")]
203 {
204 return Err(Error::other(e));
205 }
206 #[cfg(not(feature = "std"))]
207 {
208 return Err(Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)));
209 }
210 }
211 }
212
213 // The loop can finish AND fully drain a frame within this same call
214 // (decode last block, then drain it into `buf`). Validate here too when
215 // the frame is finished and nothing is left to collect, but ONLY when
216 // this call wrote no bytes: the `Read` contract forbids returning `Err`
217 // after bytes were delivered, so when `written > 0` the verify is
218 // deferred to the next call, where the top early-return runs it and
219 // returns `Err` on the zero-byte path. Idempotent with that top check.
220 #[cfg(feature = "hash")]
221 if written == 0
222 && decoder.is_finished()
223 && decoder.can_collect() == 0
224 && let Err(e) = decoder.verify_content_checksum()
225 {
226 #[cfg(feature = "std")]
227 return Err(Error::other(e));
228 #[cfg(not(feature = "std"))]
229 return Err(Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)));
230 }
231
232 Ok(written)
233 }
234
235 /// Decode-in-place fast path for whole-frame consumption. Instead of the
236 /// generic `read` loop (decode block -> `RingBuffer` -> copy into the
237 /// caller buffer), buffer the (compressed, hence small) source and decode
238 /// STRAIGHT into `output`'s spare capacity via the single-copy direct path,
239 /// pre-sized from the frame's declared content size. Only taken when the
240 /// decoder is at a frame boundary (nothing partially decoded / undrained);
241 /// otherwise it falls back to the generic grow-and-`read` loop so a caller
242 /// that mixed `read` with `read_to_end` still gets correct output.
243 ///
244 /// Per the `Read::read_to_end` contract this consumes the source to EOF: if
245 /// the stream holds several concatenated frames they are ALL decoded (and
246 /// skippable frames skipped). To recover bytes that follow a single frame,
247 /// use `read` plus the
248 /// [`SkipFrame`](crate::decoding::errors::ReadFrameHeaderError::SkipFrame)
249 /// recreate-the-decoder pattern instead.
250 #[cfg(feature = "std")]
251 fn read_to_end(&mut self, output: &mut alloc::vec::Vec<u8>) -> Result<usize, Error> {
252 let start_total = output.len();
253 // `new()` already read the frame header, so the fast path applies when
254 // the decoder sits at the start of that frame with nothing decoded yet.
255 let at_start = {
256 let d = self.decoder.borrow_mut();
257 d.is_at_frame_start() && d.can_collect() == 0
258 };
259 // Clone the (cheap Arc/Rc) dict handle out so the `decoder` borrow below
260 // does not conflict with borrowing `self.dict`.
261 let dict = self.dict.clone();
262 if at_start {
263 let mut compressed = alloc::vec::Vec::new();
264 self.source.read_to_end(&mut compressed)?;
265 self.decoder
266 .borrow_mut()
267 .decode_current_frame_to_vec(&compressed, output, dict.as_ref())
268 .map_err(Error::other)?;
269 return Ok(output.len() - start_total);
270 }
271 // Mid-frame fallback: drain the partially-read CURRENT frame through the
272 // generic path, then decode any FOLLOWING concatenated frames so
273 // read_to_end still consumes the source to true EOF.
274 loop {
275 let start = output.len();
276 output.resize(start + MAX_BLOCK_SIZE as usize, 0);
277 // On error, drop the just-grown (zeroed) tail before propagating so
278 // the caller never observes bytes that were never decoded.
279 let n = match self.read(&mut output[start..]) {
280 Ok(n) => n,
281 Err(e) => {
282 output.truncate(start);
283 return Err(e);
284 }
285 };
286 output.truncate(start + n);
287 if n == 0 {
288 break;
289 }
290 }
291 // Current frame fully drained; `source` is positioned at the next frame.
292 let mut rest = alloc::vec::Vec::new();
293 self.source.read_to_end(&mut rest)?;
294 if !rest.is_empty() {
295 let mut input = rest.as_slice();
296 self.decoder
297 .borrow_mut()
298 .decode_concatenated_frames_to_vec(&mut input, output, dict.as_ref())
299 .map_err(Error::other)?;
300 }
301 Ok(output.len() - start_total)
302 }
303
304 /// no_std counterpart of the decode-in-place `read_to_end` fast path above
305 /// (the no_std `Read::read_to_end` returns `()` instead of the byte count).
306 #[cfg(not(feature = "std"))]
307 fn read_to_end(&mut self, output: &mut alloc::vec::Vec<u8>) -> Result<(), Error> {
308 let at_start = {
309 let d = self.decoder.borrow_mut();
310 d.is_at_frame_start() && d.can_collect() == 0
311 };
312 // Cheap Arc/Rc clone so the `decoder` borrow does not conflict with
313 // borrowing `self.dict`.
314 let dict = self.dict.clone();
315 if at_start {
316 let mut compressed = alloc::vec::Vec::new();
317 self.source.read_to_end(&mut compressed)?;
318 self.decoder
319 .borrow_mut()
320 .decode_current_frame_to_vec(&compressed, output, dict.as_ref())
321 .map_err(|e| Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)))?;
322 return Ok(());
323 }
324 // Mid-frame fallback: drain the partial CURRENT frame, then decode the
325 // FOLLOWING concatenated frames so the source is consumed to true EOF.
326 loop {
327 let start = output.len();
328 output.resize(start + MAX_BLOCK_SIZE as usize, 0);
329 // On error, drop the just-grown (zeroed) tail before propagating so
330 // the caller never observes bytes that were never decoded.
331 let n = match self.read(&mut output[start..]) {
332 Ok(n) => n,
333 Err(e) => {
334 output.truncate(start);
335 return Err(e);
336 }
337 };
338 output.truncate(start + n);
339 if n == 0 {
340 break;
341 }
342 }
343 let mut rest = alloc::vec::Vec::new();
344 self.source.read_to_end(&mut rest)?;
345 if !rest.is_empty() {
346 let mut input = rest.as_slice();
347 self.decoder
348 .borrow_mut()
349 .decode_concatenated_frames_to_vec(&mut input, output, dict.as_ref())
350 .map_err(|e| Error::new(ErrorKind::Other, alloc::boxed::Box::new(e)))?;
351 }
352 Ok(())
353 }
354}
355
356#[cfg(test)]
357mod tests {
358 use super::StreamingDecoder;
359 use crate::io::Read;
360
361 /// `Read::read` must not return `Err` after it has already written bytes
362 /// into the caller's buffer (the trait mandates that an error implies no
363 /// bytes were read). When a single `read` call both drains the final bytes
364 /// of a `Verify`-mode frame AND finishes it, a checksum mismatch must be
365 /// deferred: those bytes are delivered as `Ok(n)` and the error surfaces on
366 /// the next (zero-byte) call, where returning `Err` violates no contract.
367 #[cfg(feature = "hash")]
368 #[test]
369 fn read_delivering_bytes_defers_checksum_error_to_next_call() {
370 use crate::decoding::ContentChecksum;
371 use crate::encoding::{CompressionLevel, FrameCompressor};
372 use crate::io::ErrorKind;
373 use alloc::vec;
374 use alloc::vec::Vec;
375
376 let payload: Vec<u8> = (0..8192u32).map(|i| (i & 0xFF) as u8).collect();
377 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
378 // Checksum is the subject under test; the encoder default is off
379 // (upstream library parity).
380 compressor.set_content_checksum(true);
381 compressor.set_source(payload.as_slice());
382 let mut compressed = Vec::new();
383 compressor.set_drain(&mut compressed);
384 compressor.compress();
385
386 // Corrupt the trailing 4-byte content checksum: the body still decodes
387 // to the right bytes, but the stored digest no longer matches.
388 let last = compressed.len() - 1;
389 compressed[last] ^= 0xFF;
390
391 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
392 decoder
393 .decoder
394 .set_content_checksum(ContentChecksum::Verify);
395
396 // A buffer large enough to drain the whole frame in one call: this call
397 // finishes the frame AND writes every payload byte. The mismatch must
398 // NOT abort it (that would drop the delivered bytes).
399 let mut buf = vec![0u8; payload.len() + 4096];
400 let n = decoder
401 .read(&mut buf)
402 .expect("a read that delivered bytes must not return the checksum Err");
403 assert_eq!(n, payload.len());
404 assert_eq!(&buf[..n], payload.as_slice());
405
406 // The deferred mismatch surfaces on the terminating zero-byte read.
407 let err = decoder
408 .read(&mut buf)
409 .expect_err("deferred checksum mismatch must surface on the terminating read");
410 assert_eq!(err.kind(), ErrorKind::Other);
411 }
412
413 /// A fresh `read_to_end` must take the single-copy decode-in-place path
414 /// (FCS-declared frame decoded straight into the output `Vec`, no ring
415 /// drain) AND reproduce the payload byte-for-byte.
416 #[cfg(feature = "std")]
417 #[test]
418 fn read_to_end_decode_in_place_matches_and_takes_direct_path() {
419 use crate::encoding::{CompressionLevel, FrameCompressor};
420 use alloc::vec::Vec;
421
422 let payload: Vec<u8> = (0..20_000u32)
423 .map(|i| (i.wrapping_mul(2654435761) >> 24) as u8)
424 .collect();
425 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
426 compressor.set_source(payload.as_slice());
427 let mut compressed = Vec::new();
428 compressor.set_drain(&mut compressed);
429 compressor.compress();
430
431 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
432 let mut out = Vec::new();
433 let n = decoder.read_to_end(&mut out).unwrap();
434 assert_eq!(n, payload.len());
435 assert_eq!(out, payload);
436 // FrameCompressor declares FCS, so the fresh fast path used the direct
437 // (decode-in-place) route, not the ring drain.
438 assert_eq!(decoder.decoder.direct_frames(), 1);
439 }
440
441 /// `read_to_end` after a partial `read` must still produce the full
442 /// payload. The decoder is mid-frame, so the fast path is skipped and the
443 /// generic grow-and-drain fallback runs (no direct frame).
444 #[cfg(feature = "std")]
445 #[test]
446 fn read_to_end_after_partial_read_is_complete() {
447 use crate::encoding::{CompressionLevel, FrameCompressor};
448 use alloc::vec;
449 use alloc::vec::Vec;
450
451 let payload: Vec<u8> = (0..20_000u32).map(|i| (i & 0xFF) as u8).collect();
452 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
453 compressor.set_source(payload.as_slice());
454 let mut compressed = Vec::new();
455 compressor.set_drain(&mut compressed);
456 compressor.compress();
457
458 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
459 let mut head = vec![0u8; 4096];
460 let got = decoder.read(&mut head).unwrap();
461 assert!(got > 0 && got <= head.len());
462
463 let mut out = Vec::new();
464 out.extend_from_slice(&head[..got]);
465 decoder.read_to_end(&mut out).unwrap();
466 assert_eq!(out, payload);
467 // Mid-frame entry → fallback path, never the direct route.
468 assert_eq!(decoder.decoder.direct_frames(), 0);
469 }
470
471 /// `read_to_end` reads the WHOLE source to EOF: a stream of concatenated
472 /// frames must decode every frame, not just the first. (The fast path
473 /// buffers the whole source, so dropping the trailing frame would lose
474 /// data.)
475 #[cfg(feature = "std")]
476 #[test]
477 fn read_to_end_decodes_all_concatenated_frames() {
478 use crate::encoding::{CompressionLevel, compress_slice_to_vec};
479 use alloc::vec::Vec;
480
481 let a: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
482 let b: Vec<u8> = (0..3000u32)
483 .map(|i| ((i.wrapping_mul(7)) & 0xFF) as u8)
484 .collect();
485 let mut stream = compress_slice_to_vec(&a, CompressionLevel::Level(3));
486 stream.extend_from_slice(&compress_slice_to_vec(&b, CompressionLevel::Level(3)));
487
488 let mut decoder = StreamingDecoder::new(stream.as_slice()).unwrap();
489 let mut out = Vec::new();
490 decoder.read_to_end(&mut out).unwrap();
491
492 let mut expected = a.clone();
493 expected.extend_from_slice(&b);
494 assert_eq!(out, expected);
495 // Both FCS-declared frames took the direct path.
496 assert_eq!(decoder.decoder.direct_frames(), 2);
497 }
498
499 /// `read_to_end` after a partial `read` must STILL consume the source to
500 /// EOF across concatenated frames, not stop at the current frame's end. The
501 /// partial read forces the mid-frame fallback path; with two concatenated
502 /// frames the fallback must finish frame 1, then advance through frame 2.
503 #[cfg(feature = "std")]
504 #[test]
505 fn read_to_end_after_partial_read_decodes_all_concatenated_frames() {
506 use crate::encoding::{CompressionLevel, compress_slice_to_vec};
507 use alloc::vec;
508 use alloc::vec::Vec;
509
510 let a: Vec<u8> = (0..6000u32).map(|i| (i & 0xFF) as u8).collect();
511 let b: Vec<u8> = (0..4000u32)
512 .map(|i| ((i.wrapping_mul(11)) & 0xFF) as u8)
513 .collect();
514 let mut stream = compress_slice_to_vec(&a, CompressionLevel::Level(3));
515 stream.extend_from_slice(&compress_slice_to_vec(&b, CompressionLevel::Level(3)));
516
517 let mut decoder = StreamingDecoder::new(stream.as_slice()).unwrap();
518 // Partial read of frame 1 → mid-frame, so read_to_end takes the fallback.
519 let mut head = vec![0u8; 2048];
520 let got = decoder.read(&mut head).unwrap();
521 assert!(got > 0 && got <= head.len());
522
523 let mut out = Vec::new();
524 out.extend_from_slice(&head[..got]);
525 decoder.read_to_end(&mut out).unwrap();
526
527 let mut expected = a.clone();
528 expected.extend_from_slice(&b);
529 assert_eq!(
530 out, expected,
531 "fallback path must decode frame 2 too, not stop at frame 1 EOF"
532 );
533 }
534
535 /// `read_to_end` on a stream of concatenated DICTIONARY frames must decode
536 /// every frame WITH the dictionary the decoder was constructed with. The
537 /// fast-path concatenated loop re-initialises following frames, and a plain
538 /// re-init resolves dictionaries by frame id only — losing the forced
539 /// dictionary for frames that omit (or can't resolve) the id.
540 #[cfg(feature = "std")]
541 #[test]
542 fn read_to_end_concatenated_dict_frames_decode_with_dictionary() {
543 use crate::encoding::{CompressionLevel, FrameCompressor};
544 use alloc::vec::Vec;
545
546 let dict_raw = include_bytes!("../../dict_tests/dictionary");
547 let compress_with_dict = |payload: &[u8]| -> Vec<u8> {
548 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
549 compressor
550 .set_dictionary_from_bytes(dict_raw)
551 .expect("dict load");
552 compressor.set_source(payload);
553 let mut compressed = Vec::new();
554 compressor.set_drain(&mut compressed);
555 compressor.compress();
556 compressed
557 };
558
559 let a = b"first dictionary-compressed frame payload".to_vec();
560 let b = b"second dictionary-compressed frame payload".to_vec();
561 let mut stream = compress_with_dict(&a);
562 stream.extend_from_slice(&compress_with_dict(&b));
563
564 let mut decoder =
565 StreamingDecoder::new_with_dictionary_bytes(stream.as_slice(), dict_raw).unwrap();
566 let mut out = Vec::new();
567 decoder
568 .read_to_end(&mut out)
569 .expect("both dict frames must decode with the forced dictionary");
570
571 let mut expected = a.clone();
572 expected.extend_from_slice(&b);
573 assert_eq!(out, expected);
574 }
575
576 /// A direct-path decode error must NOT leave non-decoded bytes in `output`.
577 /// The fast path resizes `output` to the declared content size before
578 /// decoding; if decode fails, the enlarged (zeroed) tail must be truncated
579 /// away so callers never observe bytes that were never decoded.
580 #[cfg(feature = "std")]
581 #[test]
582 fn read_to_end_truncates_output_on_direct_decode_error() {
583 use crate::encoding::{CompressionLevel, FrameCompressor};
584 use alloc::vec::Vec;
585
586 let payload: Vec<u8> = (0..5000u32).map(|i| (i & 0xFF) as u8).collect();
587 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
588 compressor.set_source(payload.as_slice());
589 let mut compressed = Vec::new();
590 compressor.set_drain(&mut compressed);
591 compressor.compress();
592 // Truncate the block bytes (the FCS-bearing header at the front stays
593 // intact) so the header parses but the direct-path block decode hits a
594 // premature end → error after `output` was already resized.
595 compressed.truncate(compressed.len() - 40);
596
597 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
598 let mut out = b"SENTINEL".to_vec();
599 let result = decoder.read_to_end(&mut out);
600 assert!(result.is_err(), "truncated block must fail the decode");
601 assert_eq!(
602 out, b"SENTINEL",
603 "failed direct decode must not append non-decoded bytes to output"
604 );
605 }
606
607 /// The mid-frame fallback grows `output` by `MAX_BLOCK_SIZE` before each
608 /// `self.read`. When that read errors (truncated current frame), the grown
609 /// zero-filled tail must be truncated away before the error propagates, so
610 /// the caller never observes `MAX_BLOCK_SIZE` worth of bytes that were never
611 /// decoded.
612 #[cfg(feature = "std")]
613 #[test]
614 fn read_to_end_truncates_output_on_midframe_fallback_error() {
615 use crate::encoding::{CompressionLevel, CompressionParameters, FrameCompressor};
616 use alloc::vec;
617 use alloc::vec::Vec;
618
619 // Incompressible payload with a window (128 KiB) SMALLER than the input,
620 // so the frame holds several blocks and bytes become collectable while
621 // the frame is still mid-decode. Without a sub-input window the decoder
622 // retains the whole input until the frame finishes, and a partial read
623 // could only ever finish or error, never leave a truncated remainder for
624 // the fallback to trip on.
625 let payload: Vec<u8> = (0..320_000u32)
626 .map(|i| (i.wrapping_mul(2654435761) >> 24) as u8)
627 .collect();
628 let params = CompressionParameters::builder(CompressionLevel::Default)
629 .window_log(17)
630 .build()
631 .expect("window_log within bounds");
632 let mut compressor = FrameCompressor::new(CompressionLevel::Default);
633 compressor.set_parameters(¶ms);
634 compressor.set_source(payload.as_slice());
635 let mut compressed = Vec::new();
636 compressor.set_drain(&mut compressed);
637 compressor.compress();
638 // Truncate the tail so the final block decode fails partway through.
639 compressed.truncate(compressed.len() - 40);
640
641 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
642 // A partial `read` first: leaves the decoder mid-frame so `read_to_end`
643 // takes the grow-and-drain fallback (not the decode-in-place fast path).
644 let mut head = vec![0u8; 4096];
645 let got = decoder.read(&mut head).unwrap();
646 assert!(got > 0);
647
648 let mut out = Vec::new();
649 out.extend_from_slice(&head[..got]);
650 let result = decoder.read_to_end(&mut out);
651 assert!(
652 result.is_err(),
653 "truncated current frame must fail the decode"
654 );
655 assert!(
656 out.len() <= payload.len(),
657 "failed fallback read must not leave a zero-filled tail (len {} > payload {})",
658 out.len(),
659 payload.len()
660 );
661 assert_eq!(
662 out.as_slice(),
663 &payload[..out.len()],
664 "decoded prefix must match the payload, with no appended non-decoded bytes"
665 );
666 }
667
668 /// An empty (`Frame_Content_Size = 0`) frame decodes to nothing through the
669 /// `read_to_end` fast path — the declared-size validation accepts the valid
670 /// case (produced == 0) instead of erroring.
671 #[cfg(feature = "std")]
672 #[test]
673 fn read_to_end_empty_frame_decodes_to_empty() {
674 use crate::encoding::{CompressionLevel, compress_slice_to_vec};
675 use alloc::vec::Vec;
676
677 let compressed = compress_slice_to_vec(&[], CompressionLevel::Level(3));
678 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
679 let mut out = Vec::new();
680 decoder.read_to_end(&mut out).unwrap();
681 assert!(out.is_empty());
682 }
683}