1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14 CompressionLevel, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15 frame_compressor::CompressState, frame_compressor::FseTables, frame_header::FrameHeader,
16};
17use crate::io::{Error, ErrorKind, Write};
18
19pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
25 drain: Option<W>,
26 compression_level: CompressionLevel,
27 state: CompressState<M>,
28 pending: Vec<u8>,
29 encoded_scratch: Vec<u8>,
30 errored: bool,
31 last_error_kind: Option<ErrorKind>,
32 last_error_message: Option<String>,
33 frame_started: bool,
34 pledged_content_size: Option<u64>,
35 bytes_consumed: u64,
36 #[cfg(feature = "hash")]
37 hasher: XxHash64,
38}
39
40impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
41 pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
46 Self::new_with_matcher(
47 MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
48 drain,
49 compression_level,
50 )
51 }
52}
53
54impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
55 pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
60 Self {
61 drain: Some(drain),
62 compression_level,
63 state: CompressState {
64 matcher,
65 last_huff_table: None,
66 fse_tables: FseTables::new(),
67 block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
68 offset_hist: [1, 4, 8],
69 },
70 pending: Vec::new(),
71 encoded_scratch: Vec::new(),
72 errored: false,
73 last_error_kind: None,
74 last_error_message: None,
75 frame_started: false,
76 pledged_content_size: None,
77 bytes_consumed: 0,
78 #[cfg(feature = "hash")]
79 hasher: XxHash64::with_seed(0),
80 }
81 }
82
83 pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
94 self.ensure_open()?;
95 if self.frame_started {
96 return Err(invalid_input_error(
97 "pledged content size must be set before the first write",
98 ));
99 }
100 self.pledged_content_size = Some(size);
101 self.state.matcher.set_source_size_hint(size);
104 Ok(())
105 }
106
107 pub fn set_source_size_hint(&mut self, size: u64) -> Result<(), Error> {
115 self.ensure_open()?;
116 if self.frame_started {
117 return Err(invalid_input_error(
118 "source size hint must be set before the first write",
119 ));
120 }
121 self.state.matcher.set_source_size_hint(size);
122 Ok(())
123 }
124
125 pub fn get_ref(&self) -> &W {
130 self.drain
131 .as_ref()
132 .expect("streaming encoder drain is present until finish consumes self")
133 }
134
135 pub fn get_mut(&mut self) -> &mut W {
143 self.drain
144 .as_mut()
145 .expect("streaming encoder drain is present until finish consumes self")
146 }
147
148 pub fn finish(mut self) -> Result<W, Error> {
153 self.ensure_open()?;
154
155 if let Some(pledged) = self.pledged_content_size
159 && self.bytes_consumed != pledged
160 {
161 return Err(invalid_input_error(
162 "pledged content size does not match bytes consumed",
163 ));
164 }
165
166 self.ensure_frame_started()?;
167
168 if self.pending.is_empty() {
169 self.write_empty_last_block()
170 .map_err(|err| self.fail(err))?;
171 } else {
172 self.emit_pending_block(true)?;
173 }
174
175 let mut drain = self
176 .drain
177 .take()
178 .expect("streaming encoder drain must be present when finishing");
179
180 #[cfg(feature = "hash")]
181 {
182 let checksum = self.hasher.finish() as u32;
183 drain
184 .write_all(&checksum.to_le_bytes())
185 .map_err(|err| self.fail(err))?;
186 }
187
188 drain.flush().map_err(|err| self.fail(err))?;
189 Ok(drain)
190 }
191
192 fn ensure_open(&self) -> Result<(), Error> {
193 if self.errored {
194 return Err(self.sticky_error());
195 }
196 Ok(())
197 }
198
199 fn sticky_error(&self) -> Error {
203 match (self.last_error_kind, self.last_error_message.as_deref()) {
204 (Some(kind), Some(message)) => error_with_kind_message(
205 kind,
206 format!(
207 "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
208 ),
209 ),
210 (Some(kind), None) => error_from_kind(kind),
211 (None, Some(message)) => other_error_owned(format!(
212 "streaming encoder is in an errored state: {message}"
213 )),
214 (None, None) => other_error("streaming encoder is in an errored state"),
215 }
216 }
217
218 fn drain_mut(&mut self) -> Result<&mut W, Error> {
219 self.drain
220 .as_mut()
221 .ok_or_else(|| other_error("streaming encoder has no active drain"))
222 }
223
224 fn ensure_frame_started(&mut self) -> Result<(), Error> {
225 if self.frame_started {
226 return Ok(());
227 }
228
229 self.ensure_level_supported()?;
230 self.state.matcher.reset(self.compression_level);
231 self.state.offset_hist = [1, 4, 8];
232 self.state.last_huff_table = None;
233 self.state.fse_tables.ll_previous = None;
234 self.state.fse_tables.ml_previous = None;
235 self.state.fse_tables.of_previous = None;
236 #[cfg(feature = "hash")]
237 {
238 self.hasher = XxHash64::with_seed(0);
239 }
240
241 let window_size = self.state.matcher.window_size();
242 if window_size == 0 {
243 return Err(invalid_input_error(
244 "matcher reported window_size == 0, which is invalid",
245 ));
246 }
247
248 let single_segment = self
254 .pledged_content_size
255 .map(|size| (512..=(1 << 14)).contains(&size) && size <= window_size)
256 .unwrap_or(false);
257
258 let header = FrameHeader {
259 frame_content_size: self.pledged_content_size,
260 single_segment,
261 content_checksum: cfg!(feature = "hash"),
262 dictionary_id: None,
263 window_size: if single_segment {
264 None
265 } else {
266 Some(window_size)
267 },
268 };
269 let mut encoded_header = Vec::new();
270 header.serialize(&mut encoded_header);
271 self.drain_mut()
272 .and_then(|drain| drain.write_all(&encoded_header))
273 .map_err(|err| self.fail(err))?;
274
275 self.frame_started = true;
276 Ok(())
277 }
278
279 fn block_capacity(&self) -> usize {
280 let matcher_window = self.state.matcher.window_size() as usize;
281 core::cmp::max(1, core::cmp::min(matcher_window, MAX_BLOCK_SIZE as usize))
282 }
283
284 fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
285 let mut space = match self.compression_level {
286 CompressionLevel::Fastest
287 | CompressionLevel::Default
288 | CompressionLevel::Better
289 | CompressionLevel::Best
290 | CompressionLevel::Level(_) => self.state.matcher.get_next_space(),
291 CompressionLevel::Uncompressed => Vec::new(),
292 };
293 space.clear();
294 if space.capacity() > block_capacity {
295 space.shrink_to(block_capacity);
296 }
297 if space.capacity() < block_capacity {
298 space.reserve(block_capacity - space.capacity());
299 }
300 space
301 }
302
303 fn emit_full_pending_block(
304 &mut self,
305 block_capacity: usize,
306 consumed: usize,
307 ) -> Option<Result<usize, Error>> {
308 if self.pending.len() != block_capacity {
309 return None;
310 }
311
312 let new_pending = self.allocate_pending_space(block_capacity);
313 let full_block = mem::replace(&mut self.pending, new_pending);
314 if let Err((err, restored_block)) = self.encode_block(full_block, false) {
315 self.pending = restored_block;
316 let err = self.fail(err);
317 if consumed > 0 {
318 return Some(Ok(consumed));
319 }
320 return Some(Err(err));
321 }
322 None
323 }
324
325 fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
326 let block = mem::take(&mut self.pending);
327 if let Err((err, restored_block)) = self.encode_block(block, last_block) {
328 self.pending = restored_block;
329 return Err(self.fail(err));
330 }
331 if !last_block {
332 let block_capacity = self.block_capacity();
333 self.pending = self.allocate_pending_space(block_capacity);
334 }
335 Ok(())
336 }
337
338 fn ensure_level_supported(&self) -> Result<(), Error> {
342 match self.compression_level {
343 CompressionLevel::Uncompressed
344 | CompressionLevel::Fastest
345 | CompressionLevel::Default
346 | CompressionLevel::Better
347 | CompressionLevel::Best
348 | CompressionLevel::Level(_) => Ok(()),
349 }
350 }
351
352 fn encode_block(
353 &mut self,
354 uncompressed_data: Vec<u8>,
355 last_block: bool,
356 ) -> Result<(), (Error, Vec<u8>)> {
357 let mut raw_block = Some(uncompressed_data);
358 let mut encoded = Vec::new();
359 mem::swap(&mut encoded, &mut self.encoded_scratch);
360 encoded.clear();
361 let needed_capacity = self.block_capacity() + 3;
362 if encoded.capacity() < needed_capacity {
363 encoded.reserve(needed_capacity.saturating_sub(encoded.len()));
364 }
365 let mut moved_into_matcher = false;
366 if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
367 let header = BlockHeader {
368 last_block,
369 block_type: crate::blocks::block::BlockType::Raw,
370 block_size: 0,
371 };
372 header.serialize(&mut encoded);
373 } else {
374 match self.compression_level {
375 CompressionLevel::Uncompressed => {
376 let block = raw_block.as_ref().expect("raw block missing");
377 let header = BlockHeader {
378 last_block,
379 block_type: crate::blocks::block::BlockType::Raw,
380 block_size: block.len() as u32,
381 };
382 header.serialize(&mut encoded);
383 encoded.extend_from_slice(block);
384 }
385 CompressionLevel::Fastest
386 | CompressionLevel::Default
387 | CompressionLevel::Better
388 | CompressionLevel::Best
389 | CompressionLevel::Level(_) => {
390 let block = raw_block.take().expect("raw block missing");
391 debug_assert!(!block.is_empty(), "empty blocks handled above");
392 compress_block_encoded(
393 &mut self.state,
394 self.compression_level,
395 last_block,
396 block,
397 &mut encoded,
398 );
399 moved_into_matcher = true;
400 }
401 }
402 }
403
404 if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
405 encoded.clear();
406 mem::swap(&mut encoded, &mut self.encoded_scratch);
407 let restored = if moved_into_matcher {
408 self.state.matcher.get_last_space().to_vec()
409 } else {
410 raw_block.unwrap_or_default()
411 };
412 return Err((err, restored));
413 }
414
415 if moved_into_matcher {
416 #[cfg(feature = "hash")]
417 {
418 self.hasher.write(self.state.matcher.get_last_space());
419 }
420 } else {
421 self.hash_block(raw_block.as_deref().unwrap_or(&[]));
422 }
423 encoded.clear();
424 mem::swap(&mut encoded, &mut self.encoded_scratch);
425 Ok(())
426 }
427
428 fn write_empty_last_block(&mut self) -> Result<(), Error> {
429 self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
430 }
431
432 fn fail(&mut self, err: Error) -> Error {
433 self.errored = true;
434 if self.last_error_kind.is_none() {
435 self.last_error_kind = Some(err.kind());
436 }
437 if self.last_error_message.is_none() {
438 self.last_error_message = Some(err.to_string());
439 }
440 err
441 }
442
443 #[cfg(feature = "hash")]
444 fn hash_block(&mut self, uncompressed_data: &[u8]) {
445 self.hasher.write(uncompressed_data);
446 }
447
448 #[cfg(not(feature = "hash"))]
449 fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
450}
451
452impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
453 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
454 self.ensure_open()?;
455 if buf.is_empty() {
456 return Ok(0);
457 }
458
459 if let Some(pledged) = self.pledged_content_size
463 && self.bytes_consumed >= pledged
464 {
465 return Err(invalid_input_error(
466 "write would exceed pledged content size",
467 ));
468 }
469
470 self.ensure_frame_started()?;
471
472 let buf = if let Some(pledged) = self.pledged_content_size {
476 let remaining_allowed = pledged
477 .checked_sub(self.bytes_consumed)
478 .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
479 if remaining_allowed == 0 {
480 return Err(invalid_input_error(
481 "write would exceed pledged content size",
482 ));
483 }
484 let accepted = core::cmp::min(
485 buf.len(),
486 usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
487 );
488 &buf[..accepted]
489 } else {
490 buf
491 };
492
493 let block_capacity = self.block_capacity();
494 if self.pending.capacity() == 0 {
495 self.pending = self.allocate_pending_space(block_capacity);
496 }
497 let mut remaining = buf;
498 let mut consumed = 0usize;
499
500 while !remaining.is_empty() {
501 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
502 return result;
503 }
504
505 let available = block_capacity - self.pending.len();
506 let to_take = core::cmp::min(remaining.len(), available);
507 if to_take == 0 {
508 break;
509 }
510 self.pending.extend_from_slice(&remaining[..to_take]);
511 remaining = &remaining[to_take..];
512 consumed += to_take;
513
514 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
515 if let Ok(n) = &result {
516 self.bytes_consumed += *n as u64;
517 }
518 return result;
519 }
520 }
521 self.bytes_consumed += consumed as u64;
522 Ok(consumed)
523 }
524
525 fn flush(&mut self) -> Result<(), Error> {
526 self.ensure_open()?;
527 if self.pending.is_empty() {
528 return self
529 .drain_mut()
530 .and_then(|drain| drain.flush())
531 .map_err(|err| self.fail(err));
532 }
533 self.ensure_frame_started()?;
534 self.emit_pending_block(false)?;
535 self.drain_mut()
536 .and_then(|drain| drain.flush())
537 .map_err(|err| self.fail(err))
538 }
539}
540
541fn error_from_kind(kind: ErrorKind) -> Error {
542 Error::from(kind)
543}
544
545fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
546 #[cfg(feature = "std")]
547 {
548 Error::new(kind, message)
549 }
550 #[cfg(not(feature = "std"))]
551 {
552 Error::new(kind, alloc::boxed::Box::new(message))
553 }
554}
555
556fn invalid_input_error(message: &str) -> Error {
557 #[cfg(feature = "std")]
558 {
559 Error::new(ErrorKind::InvalidInput, message)
560 }
561 #[cfg(not(feature = "std"))]
562 {
563 Error::new(
564 ErrorKind::Other,
565 alloc::boxed::Box::new(alloc::string::String::from(message)),
566 )
567 }
568}
569
570fn other_error_owned(message: String) -> Error {
571 #[cfg(feature = "std")]
572 {
573 Error::other(message)
574 }
575 #[cfg(not(feature = "std"))]
576 {
577 Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
578 }
579}
580
581fn other_error(message: &str) -> Error {
582 #[cfg(feature = "std")]
583 {
584 Error::other(message)
585 }
586 #[cfg(not(feature = "std"))]
587 {
588 Error::new(
589 ErrorKind::Other,
590 alloc::boxed::Box::new(alloc::string::String::from(message)),
591 )
592 }
593}
594
595#[cfg(test)]
596mod tests {
597 use crate::decoding::StreamingDecoder;
598 use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
599 use crate::io::{Error, ErrorKind, Read, Write};
600 use alloc::vec;
601 use alloc::vec::Vec;
602
603 struct TinyMatcher {
604 last_space: Vec<u8>,
605 window_size: u64,
606 }
607
608 impl TinyMatcher {
609 fn new(window_size: u64) -> Self {
610 Self {
611 last_space: Vec::new(),
612 window_size,
613 }
614 }
615 }
616
617 impl Matcher for TinyMatcher {
618 fn get_next_space(&mut self) -> Vec<u8> {
619 vec![0; self.window_size as usize]
620 }
621
622 fn get_last_space(&mut self) -> &[u8] {
623 self.last_space.as_slice()
624 }
625
626 fn commit_space(&mut self, space: Vec<u8>) {
627 self.last_space = space;
628 }
629
630 fn skip_matching(&mut self) {}
631
632 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
633 handle_sequence(Sequence::Literals {
634 literals: self.last_space.as_slice(),
635 });
636 }
637
638 fn reset(&mut self, _level: CompressionLevel) {
639 self.last_space.clear();
640 }
641
642 fn window_size(&self) -> u64 {
643 self.window_size
644 }
645 }
646
647 struct FailingWriteOnce {
648 writes: usize,
649 fail_on_write_number: usize,
650 sink: Vec<u8>,
651 }
652
653 impl FailingWriteOnce {
654 fn new(fail_on_write_number: usize) -> Self {
655 Self {
656 writes: 0,
657 fail_on_write_number,
658 sink: Vec::new(),
659 }
660 }
661 }
662
663 impl Write for FailingWriteOnce {
664 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
665 self.writes += 1;
666 if self.writes == self.fail_on_write_number {
667 return Err(super::other_error("injected write failure"));
668 }
669 self.sink.extend_from_slice(buf);
670 Ok(buf.len())
671 }
672
673 fn flush(&mut self) -> Result<(), Error> {
674 Ok(())
675 }
676 }
677
678 struct FailingWithKind {
679 writes: usize,
680 fail_on_write_number: usize,
681 kind: ErrorKind,
682 }
683
684 impl FailingWithKind {
685 fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
686 Self {
687 writes: 0,
688 fail_on_write_number,
689 kind,
690 }
691 }
692 }
693
694 impl Write for FailingWithKind {
695 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
696 self.writes += 1;
697 if self.writes == self.fail_on_write_number {
698 return Err(Error::from(self.kind));
699 }
700 Ok(buf.len())
701 }
702
703 fn flush(&mut self) -> Result<(), Error> {
704 Ok(())
705 }
706 }
707
708 struct PartialThenFailWriter {
709 writes: usize,
710 fail_on_write_number: usize,
711 partial_prefix_len: usize,
712 terminal_failure: bool,
713 sink: Vec<u8>,
714 }
715
716 impl PartialThenFailWriter {
717 fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
718 Self {
719 writes: 0,
720 fail_on_write_number,
721 partial_prefix_len,
722 terminal_failure: false,
723 sink: Vec::new(),
724 }
725 }
726 }
727
728 impl Write for PartialThenFailWriter {
729 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
730 if self.terminal_failure {
731 return Err(super::other_error("injected terminal write failure"));
732 }
733
734 self.writes += 1;
735 if self.writes == self.fail_on_write_number {
736 let written = core::cmp::min(self.partial_prefix_len, buf.len());
737 if written > 0 {
738 self.sink.extend_from_slice(&buf[..written]);
739 self.terminal_failure = true;
740 return Ok(written);
741 }
742 return Err(super::other_error("injected terminal write failure"));
743 }
744
745 self.sink.extend_from_slice(buf);
746 Ok(buf.len())
747 }
748
749 fn flush(&mut self) -> Result<(), Error> {
750 Ok(())
751 }
752 }
753
754 #[test]
755 fn streaming_encoder_roundtrip_multiple_writes() {
756 let payload = b"streaming-encoder-roundtrip-".repeat(1024);
757 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
758 for chunk in payload.chunks(313) {
759 encoder.write_all(chunk).unwrap();
760 }
761 let compressed = encoder.finish().unwrap();
762
763 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
764 let mut decoded = Vec::new();
765 decoder.read_to_end(&mut decoded).unwrap();
766 assert_eq!(decoded, payload);
767 }
768
769 #[test]
770 fn flush_emits_nonempty_partial_output() {
771 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
772 encoder.write_all(b"partial-block").unwrap();
773 encoder.flush().unwrap();
774 let flushed_len = encoder.get_ref().len();
775 assert!(
776 flushed_len > 0,
777 "flush should emit header+partial block bytes"
778 );
779 let compressed = encoder.finish().unwrap();
780 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
781 let mut decoded = Vec::new();
782 decoder.read_to_end(&mut decoded).unwrap();
783 assert_eq!(decoded, b"partial-block");
784 }
785
786 #[test]
787 fn flush_without_writes_does_not_emit_frame_header() {
788 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
789 encoder.flush().unwrap();
790 assert!(encoder.get_ref().is_empty());
791 }
792
793 #[test]
794 fn block_boundary_write_emits_block_in_same_call() {
795 let mut boundary = StreamingEncoder::new_with_matcher(
796 TinyMatcher::new(4),
797 Vec::new(),
798 CompressionLevel::Uncompressed,
799 );
800 let mut below = StreamingEncoder::new_with_matcher(
801 TinyMatcher::new(4),
802 Vec::new(),
803 CompressionLevel::Uncompressed,
804 );
805
806 boundary.write_all(b"ABCD").unwrap();
807 below.write_all(b"ABC").unwrap();
808
809 let boundary_len = boundary.get_ref().len();
810 let below_len = below.get_ref().len();
811 assert!(
812 boundary_len > below_len,
813 "full block should be emitted immediately at block boundary"
814 );
815 }
816
817 #[test]
818 fn finish_consumes_encoder_and_emits_frame() {
819 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
820 encoder.write_all(b"abc").unwrap();
821 let compressed = encoder.finish().unwrap();
822 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
823 let mut decoded = Vec::new();
824 decoder.read_to_end(&mut decoded).unwrap();
825 assert_eq!(decoded, b"abc");
826 }
827
828 #[test]
829 fn finish_without_writes_emits_empty_frame() {
830 let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
831 let compressed = encoder.finish().unwrap();
832 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
833 let mut decoded = Vec::new();
834 decoder.read_to_end(&mut decoded).unwrap();
835 assert!(decoded.is_empty());
836 }
837
838 #[test]
839 fn write_empty_buffer_returns_zero() {
840 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
841 assert_eq!(encoder.write(&[]).unwrap(), 0);
842 let _ = encoder.finish().unwrap();
843 }
844
845 #[test]
846 fn uncompressed_level_roundtrip() {
847 let payload = b"uncompressed-streaming-roundtrip".repeat(64);
848 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
849 for chunk in payload.chunks(41) {
850 encoder.write_all(chunk).unwrap();
851 }
852 let compressed = encoder.finish().unwrap();
853 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
854 let mut decoded = Vec::new();
855 decoder.read_to_end(&mut decoded).unwrap();
856 assert_eq!(decoded, payload);
857 }
858
859 #[test]
860 fn better_level_streaming_roundtrip() {
861 let payload = b"better-level-streaming-test".repeat(256);
862 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
863 for chunk in payload.chunks(53) {
864 encoder.write_all(chunk).unwrap();
865 }
866 let compressed = encoder.finish().unwrap();
867 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
868 let mut decoded = Vec::new();
869 decoder.read_to_end(&mut decoded).unwrap();
870 assert_eq!(decoded, payload);
871 }
872
873 #[test]
874 fn zero_window_matcher_returns_invalid_input_error() {
875 let mut encoder = StreamingEncoder::new_with_matcher(
876 TinyMatcher::new(0),
877 Vec::new(),
878 CompressionLevel::Fastest,
879 );
880 let err = encoder.write_all(b"payload").unwrap_err();
881 assert_eq!(err.kind(), ErrorKind::InvalidInput);
882 }
883
884 #[test]
885 fn best_level_streaming_roundtrip() {
886 let payload = b"best-level-streaming-test".repeat(8 * 1024);
889 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
890 for chunk in payload.chunks(53) {
891 encoder.write_all(chunk).unwrap();
892 }
893 let compressed = encoder.finish().unwrap();
894 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
895 let mut decoded = Vec::new();
896 decoder.read_to_end(&mut decoded).unwrap();
897 assert_eq!(decoded, payload);
898 }
899
900 #[test]
901 fn write_failure_poisoning_is_sticky() {
902 let mut encoder = StreamingEncoder::new_with_matcher(
903 TinyMatcher::new(4),
904 FailingWriteOnce::new(1),
905 CompressionLevel::Uncompressed,
906 );
907
908 assert!(encoder.write_all(b"ABCD").is_err());
909 assert!(encoder.flush().is_err());
910 assert!(encoder.write_all(b"EFGH").is_err());
911 assert_eq!(encoder.get_ref().sink.len(), 0);
912 assert!(encoder.finish().is_err());
913 }
914
915 #[test]
916 fn poisoned_encoder_returns_original_error_kind() {
917 let mut encoder = StreamingEncoder::new_with_matcher(
918 TinyMatcher::new(4),
919 FailingWithKind::new(1, ErrorKind::BrokenPipe),
920 CompressionLevel::Uncompressed,
921 );
922
923 let first_error = encoder.write_all(b"ABCD").unwrap_err();
924 assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
925
926 let second_error = encoder.write_all(b"EFGH").unwrap_err();
927 assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
928 }
929
930 #[test]
931 fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
932 let payload = b"ABCDEFGHIJKL";
933 let mut encoder = StreamingEncoder::new_with_matcher(
934 TinyMatcher::new(4),
935 FailingWriteOnce::new(3),
936 CompressionLevel::Uncompressed,
937 );
938
939 let first_write = encoder.write(payload).unwrap();
940 assert_eq!(first_write, 8);
941 assert!(encoder.write(&payload[first_write..]).is_err());
942 assert!(encoder.flush().is_err());
943 assert!(encoder.write_all(b"EFGH").is_err());
944 }
945
946 #[test]
947 fn partial_write_failure_after_progress_poisons_encoder() {
948 let payload = b"ABCDEFGHIJKL";
949 let mut encoder = StreamingEncoder::new_with_matcher(
950 TinyMatcher::new(4),
951 PartialThenFailWriter::new(3, 1),
952 CompressionLevel::Uncompressed,
953 );
954
955 let first_write = encoder.write(payload).unwrap();
956 assert_eq!(first_write, 8);
957
958 let second_write = encoder.write(&payload[first_write..]);
959 assert!(second_write.is_err());
960 assert!(encoder.flush().is_err());
961 assert!(encoder.write_all(b"MNOP").is_err());
962 }
963
964 #[test]
965 fn new_with_matcher_and_get_mut_work() {
966 let matcher = TinyMatcher::new(128 * 1024);
967 let mut encoder =
968 StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
969 encoder.get_mut().extend_from_slice(b"");
970 encoder.write_all(b"custom-matcher").unwrap();
971 let compressed = encoder.finish().unwrap();
972 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
973 let mut decoded = Vec::new();
974 decoder.read_to_end(&mut decoded).unwrap();
975 assert_eq!(decoded, b"custom-matcher");
976 }
977
978 #[cfg(feature = "std")]
979 #[test]
980 fn streaming_encoder_output_decompresses_with_c_zstd() {
981 let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
982 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
983 for chunk in payload.chunks(1024) {
984 encoder.write_all(chunk).unwrap();
985 }
986 let compressed = encoder.finish().unwrap();
987
988 let mut decoded = Vec::with_capacity(payload.len());
989 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
990 assert_eq!(decoded, payload);
991 }
992
993 #[test]
994 fn pledged_content_size_written_in_header() {
995 let payload = b"hello world, pledged size test";
996 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
997 encoder
998 .set_pledged_content_size(payload.len() as u64)
999 .unwrap();
1000 encoder.write_all(payload).unwrap();
1001 let compressed = encoder.finish().unwrap();
1002
1003 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1005 .unwrap()
1006 .0;
1007 assert_eq!(header.frame_content_size(), payload.len() as u64);
1008
1009 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1011 let mut decoded = Vec::new();
1012 decoder.read_to_end(&mut decoded).unwrap();
1013 assert_eq!(decoded, payload);
1014 }
1015
1016 #[test]
1017 fn pledged_content_size_mismatch_returns_error() {
1018 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1019 encoder.set_pledged_content_size(100).unwrap();
1020 encoder.write_all(b"short payload").unwrap(); let err = encoder.finish().unwrap_err();
1022 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1023 }
1024
1025 #[test]
1026 fn write_exceeding_pledge_returns_error() {
1027 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1028 encoder.set_pledged_content_size(5).unwrap();
1029 let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
1030 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1031 }
1032
1033 #[test]
1034 fn write_straddling_pledge_reports_partial_progress() {
1035 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1036 encoder.set_pledged_content_size(5).unwrap();
1037 assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
1039 let err = encoder.write(b"g").unwrap_err();
1041 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1042 }
1043
1044 #[test]
1045 fn encoded_scratch_capacity_is_reused_across_blocks() {
1046 let payload = vec![0xAB; 64 * 3];
1047 let mut encoder = StreamingEncoder::new_with_matcher(
1048 TinyMatcher::new(64),
1049 Vec::new(),
1050 CompressionLevel::Uncompressed,
1051 );
1052
1053 encoder.write_all(&payload[..64]).unwrap();
1054 let first_capacity = encoder.encoded_scratch.capacity();
1055 assert!(
1056 first_capacity >= 67,
1057 "expected encoded scratch to keep block header + payload capacity",
1058 );
1059
1060 encoder.write_all(&payload[64..128]).unwrap();
1061 let second_capacity = encoder.encoded_scratch.capacity();
1062 assert!(
1063 second_capacity >= first_capacity,
1064 "encoded scratch capacity should be reused across block emits",
1065 );
1066
1067 encoder.write_all(&payload[128..]).unwrap();
1068 let compressed = encoder.finish().unwrap();
1069 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1070 let mut decoded = Vec::new();
1071 decoder.read_to_end(&mut decoded).unwrap();
1072 assert_eq!(decoded, payload);
1073 }
1074
1075 #[test]
1076 fn pledged_content_size_after_write_returns_error() {
1077 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1078 encoder.write_all(b"already writing").unwrap();
1079 let err = encoder.set_pledged_content_size(15).unwrap_err();
1080 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1081 }
1082
1083 #[test]
1084 fn source_size_hint_directly_reduces_window_header() {
1085 let payload = b"streaming-source-size-hint".repeat(64);
1086
1087 let mut no_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1088 no_hint.write_all(payload.as_slice()).unwrap();
1089 let no_hint_frame = no_hint.finish().unwrap();
1090 let no_hint_header = crate::decoding::frame::read_frame_header(no_hint_frame.as_slice())
1091 .unwrap()
1092 .0;
1093 let no_hint_window = no_hint_header.window_size().unwrap();
1094
1095 let mut with_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1096 with_hint
1097 .set_source_size_hint(payload.len() as u64)
1098 .unwrap();
1099 with_hint.write_all(payload.as_slice()).unwrap();
1100 let late_hint_err = with_hint
1101 .set_source_size_hint(payload.len() as u64)
1102 .unwrap_err();
1103 assert_eq!(late_hint_err.kind(), ErrorKind::InvalidInput);
1104 let with_hint_frame = with_hint.finish().unwrap();
1105 let with_hint_header =
1106 crate::decoding::frame::read_frame_header(with_hint_frame.as_slice())
1107 .unwrap()
1108 .0;
1109 let with_hint_window = with_hint_header.window_size().unwrap();
1110
1111 assert!(
1112 with_hint_window <= no_hint_window,
1113 "source size hint should not increase advertised window"
1114 );
1115
1116 let mut decoder = StreamingDecoder::new(with_hint_frame.as_slice()).unwrap();
1117 let mut decoded = Vec::new();
1118 decoder.read_to_end(&mut decoded).unwrap();
1119 assert_eq!(decoded, payload);
1120 }
1121
1122 #[cfg(feature = "std")]
1123 #[test]
1124 fn pledged_content_size_c_zstd_compatible() {
1125 let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
1126 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1127 encoder
1128 .set_pledged_content_size(payload.len() as u64)
1129 .unwrap();
1130 for chunk in payload.chunks(1024) {
1131 encoder.write_all(chunk).unwrap();
1132 }
1133 let compressed = encoder.finish().unwrap();
1134
1135 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1137 .unwrap()
1138 .0;
1139 assert_eq!(header.frame_content_size(), payload.len() as u64);
1140
1141 let mut decoded = Vec::new();
1143 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1144 assert_eq!(decoded, payload);
1145 }
1146
1147 #[test]
1148 fn single_segment_requires_pledged_to_fit_matcher_window() {
1149 let payload = b"streaming-window-gate-".repeat(60); let mut encoder = StreamingEncoder::new_with_matcher(
1151 TinyMatcher::new(1024),
1152 Vec::new(),
1153 CompressionLevel::Fastest,
1154 );
1155 encoder
1156 .set_pledged_content_size(payload.len() as u64)
1157 .unwrap();
1158 encoder.write_all(payload.as_slice()).unwrap();
1159 let compressed = encoder.finish().unwrap();
1160
1161 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1162 .unwrap()
1163 .0;
1164 assert_eq!(header.frame_content_size(), payload.len() as u64);
1165 assert!(
1166 !header.descriptor.single_segment_flag(),
1167 "single-segment must stay off when pledged content size exceeds matcher window"
1168 );
1169 assert!(
1170 header.window_size().unwrap() >= 1024,
1171 "window descriptor should be present when single-segment is disabled"
1172 );
1173 }
1174
1175 #[test]
1176 fn no_pledged_size_omits_fcs_from_header() {
1177 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1178 encoder.write_all(b"no pledged size").unwrap();
1179 let compressed = encoder.finish().unwrap();
1180
1181 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1183 .unwrap()
1184 .0;
1185 assert_eq!(header.frame_content_size(), 0);
1186 assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1189 }
1190}