1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14 CompressionLevel, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15 frame_compressor::CompressState, frame_compressor::FseTables, frame_header::FrameHeader,
16};
17use crate::io::{Error, ErrorKind, Write};
18
19pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
25 drain: Option<W>,
26 compression_level: CompressionLevel,
27 state: CompressState<M>,
28 pending: Vec<u8>,
29 encoded_scratch: Vec<u8>,
30 errored: bool,
31 last_error_kind: Option<ErrorKind>,
32 last_error_message: Option<String>,
33 frame_started: bool,
34 pledged_content_size: Option<u64>,
35 bytes_consumed: u64,
36 magicless: bool,
39 #[cfg(feature = "hash")]
40 hasher: XxHash64,
41}
42
43impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
44 pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
49 Self::new_with_matcher(
50 MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
51 drain,
52 compression_level,
53 )
54 }
55}
56
57impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
58 pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
63 Self {
64 drain: Some(drain),
65 compression_level,
66 state: CompressState {
67 matcher,
68 last_huff_table: None,
69 fse_tables: FseTables::new(),
70 block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
71 offset_hist: [1, 4, 8],
72 strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
73 compression_level,
74 ),
75 },
76 pending: Vec::new(),
77 encoded_scratch: Vec::new(),
78 errored: false,
79 last_error_kind: None,
80 last_error_message: None,
81 frame_started: false,
82 pledged_content_size: None,
83 bytes_consumed: 0,
84 magicless: false,
85 #[cfg(feature = "hash")]
86 hasher: XxHash64::with_seed(0),
87 }
88 }
89
90 pub fn set_magicless(&mut self, magicless: bool) -> Result<(), Error> {
98 self.ensure_open()?;
99 if self.frame_started {
100 return Err(invalid_input_error(
101 "magicless format must be set before the first write",
102 ));
103 }
104 self.magicless = magicless;
105 Ok(())
106 }
107
108 pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
119 self.ensure_open()?;
120 if self.frame_started {
121 return Err(invalid_input_error(
122 "pledged content size must be set before the first write",
123 ));
124 }
125 self.pledged_content_size = Some(size);
126 self.state.matcher.set_source_size_hint(size);
129 Ok(())
130 }
131
132 pub fn set_source_size_hint(&mut self, size: u64) -> Result<(), Error> {
140 self.ensure_open()?;
141 if self.frame_started {
142 return Err(invalid_input_error(
143 "source size hint must be set before the first write",
144 ));
145 }
146 self.state.matcher.set_source_size_hint(size);
147 Ok(())
148 }
149
150 pub fn get_ref(&self) -> &W {
155 self.drain
156 .as_ref()
157 .expect("streaming encoder drain is present until finish consumes self")
158 }
159
160 pub fn get_mut(&mut self) -> &mut W {
168 self.drain
169 .as_mut()
170 .expect("streaming encoder drain is present until finish consumes self")
171 }
172
173 pub fn finish(mut self) -> Result<W, Error> {
178 self.ensure_open()?;
179
180 if let Some(pledged) = self.pledged_content_size
184 && self.bytes_consumed != pledged
185 {
186 return Err(invalid_input_error(
187 "pledged content size does not match bytes consumed",
188 ));
189 }
190
191 self.ensure_frame_started()?;
192
193 if self.pending.is_empty() {
194 self.write_empty_last_block()
195 .map_err(|err| self.fail(err))?;
196 } else {
197 self.emit_pending_block(true)?;
198 }
199
200 let mut drain = self
201 .drain
202 .take()
203 .expect("streaming encoder drain must be present when finishing");
204
205 #[cfg(feature = "hash")]
206 {
207 let checksum = self.hasher.finish() as u32;
208 drain
209 .write_all(&checksum.to_le_bytes())
210 .map_err(|err| self.fail(err))?;
211 }
212
213 drain.flush().map_err(|err| self.fail(err))?;
214 Ok(drain)
215 }
216
217 fn ensure_open(&self) -> Result<(), Error> {
218 if self.errored {
219 return Err(self.sticky_error());
220 }
221 Ok(())
222 }
223
224 fn sticky_error(&self) -> Error {
228 match (self.last_error_kind, self.last_error_message.as_deref()) {
229 (Some(kind), Some(message)) => error_with_kind_message(
230 kind,
231 format!(
232 "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
233 ),
234 ),
235 (Some(kind), None) => error_from_kind(kind),
236 (None, Some(message)) => other_error_owned(format!(
237 "streaming encoder is in an errored state: {message}"
238 )),
239 (None, None) => other_error("streaming encoder is in an errored state"),
240 }
241 }
242
243 fn drain_mut(&mut self) -> Result<&mut W, Error> {
244 self.drain
245 .as_mut()
246 .ok_or_else(|| other_error("streaming encoder has no active drain"))
247 }
248
249 fn ensure_frame_started(&mut self) -> Result<(), Error> {
250 if self.frame_started {
251 return Ok(());
252 }
253
254 self.ensure_level_supported()?;
255 self.state.matcher.reset(self.compression_level);
256 self.state.offset_hist = [1, 4, 8];
257 self.state.last_huff_table = None;
258 self.state.fse_tables.ll_previous = None;
259 self.state.fse_tables.ml_previous = None;
260 self.state.fse_tables.of_previous = None;
261 self.state.strategy_tag =
267 crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level);
268 #[cfg(feature = "hash")]
269 {
270 self.hasher = XxHash64::with_seed(0);
271 }
272
273 let window_size = self.state.matcher.window_size();
274 if window_size == 0 {
275 return Err(invalid_input_error(
276 "matcher reported window_size == 0, which is invalid",
277 ));
278 }
279
280 let single_segment = self
286 .pledged_content_size
287 .map(|size| (512..=(1 << 14)).contains(&size) && size <= window_size)
288 .unwrap_or(false);
289
290 let header = FrameHeader {
291 frame_content_size: self.pledged_content_size,
292 single_segment,
293 content_checksum: cfg!(feature = "hash"),
294 dictionary_id: None,
295 window_size: if single_segment {
296 None
297 } else {
298 Some(window_size)
299 },
300 magicless: self.magicless,
301 };
302 let mut encoded_header = Vec::new();
303 header.serialize(&mut encoded_header);
304 self.drain_mut()
305 .and_then(|drain| drain.write_all(&encoded_header))
306 .map_err(|err| self.fail(err))?;
307
308 self.frame_started = true;
309 Ok(())
310 }
311
312 fn block_capacity(&self) -> usize {
313 let matcher_window = self.state.matcher.window_size() as usize;
314 core::cmp::max(1, core::cmp::min(matcher_window, MAX_BLOCK_SIZE as usize))
315 }
316
317 fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
318 let mut space = match self.compression_level {
319 CompressionLevel::Fastest
320 | CompressionLevel::Default
321 | CompressionLevel::Better
322 | CompressionLevel::Best
323 | CompressionLevel::Level(_) => self.state.matcher.get_next_space(),
324 CompressionLevel::Uncompressed => Vec::new(),
325 };
326 space.clear();
327 if space.capacity() > block_capacity {
328 space.shrink_to(block_capacity);
329 }
330 if space.capacity() < block_capacity {
331 space.reserve(block_capacity - space.capacity());
332 }
333 space
334 }
335
336 fn emit_full_pending_block(
337 &mut self,
338 block_capacity: usize,
339 consumed: usize,
340 ) -> Option<Result<usize, Error>> {
341 if self.pending.len() != block_capacity {
342 return None;
343 }
344
345 let new_pending = self.allocate_pending_space(block_capacity);
346 let full_block = mem::replace(&mut self.pending, new_pending);
347 if let Err((err, restored_block)) = self.encode_block(full_block, false) {
348 self.pending = restored_block;
349 let err = self.fail(err);
350 if consumed > 0 {
351 return Some(Ok(consumed));
352 }
353 return Some(Err(err));
354 }
355 None
356 }
357
358 fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
359 let block = mem::take(&mut self.pending);
360 if let Err((err, restored_block)) = self.encode_block(block, last_block) {
361 self.pending = restored_block;
362 return Err(self.fail(err));
363 }
364 if !last_block {
365 let block_capacity = self.block_capacity();
366 self.pending = self.allocate_pending_space(block_capacity);
367 }
368 Ok(())
369 }
370
371 fn ensure_level_supported(&self) -> Result<(), Error> {
375 match self.compression_level {
376 CompressionLevel::Uncompressed
377 | CompressionLevel::Fastest
378 | CompressionLevel::Default
379 | CompressionLevel::Better
380 | CompressionLevel::Best
381 | CompressionLevel::Level(_) => Ok(()),
382 }
383 }
384
385 fn encode_block(
386 &mut self,
387 uncompressed_data: Vec<u8>,
388 last_block: bool,
389 ) -> Result<(), (Error, Vec<u8>)> {
390 let mut raw_block = Some(uncompressed_data);
391 let mut encoded = Vec::new();
392 mem::swap(&mut encoded, &mut self.encoded_scratch);
393 encoded.clear();
394 let needed_capacity = self.block_capacity() + 3;
395 if encoded.capacity() < needed_capacity {
396 encoded.reserve(needed_capacity.saturating_sub(encoded.len()));
397 }
398 let mut moved_into_matcher = false;
399 if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
400 let header = BlockHeader {
401 last_block,
402 block_type: crate::blocks::block::BlockType::Raw,
403 block_size: 0,
404 };
405 header.serialize(&mut encoded);
406 } else {
407 match self.compression_level {
408 CompressionLevel::Uncompressed => {
409 let block = raw_block.as_ref().expect("raw block missing");
410 let header = BlockHeader {
411 last_block,
412 block_type: crate::blocks::block::BlockType::Raw,
413 block_size: block.len() as u32,
414 };
415 header.serialize(&mut encoded);
416 encoded.extend_from_slice(block);
417 }
418 CompressionLevel::Fastest
419 | CompressionLevel::Default
420 | CompressionLevel::Better
421 | CompressionLevel::Best
422 | CompressionLevel::Level(_) => {
423 let block = raw_block.take().expect("raw block missing");
424 debug_assert!(!block.is_empty(), "empty blocks handled above");
425 compress_block_encoded(
426 &mut self.state,
427 self.compression_level,
428 last_block,
429 block,
430 &mut encoded,
431 );
432 moved_into_matcher = true;
433 }
434 }
435 }
436
437 if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
438 encoded.clear();
439 mem::swap(&mut encoded, &mut self.encoded_scratch);
440 let restored = if moved_into_matcher {
441 self.state.matcher.get_last_space().to_vec()
442 } else {
443 raw_block.unwrap_or_default()
444 };
445 return Err((err, restored));
446 }
447
448 if moved_into_matcher {
449 #[cfg(feature = "hash")]
450 {
451 self.hasher.write(self.state.matcher.get_last_space());
452 }
453 } else {
454 self.hash_block(raw_block.as_deref().unwrap_or(&[]));
455 }
456 encoded.clear();
457 mem::swap(&mut encoded, &mut self.encoded_scratch);
458 Ok(())
459 }
460
461 fn write_empty_last_block(&mut self) -> Result<(), Error> {
462 self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
463 }
464
465 fn fail(&mut self, err: Error) -> Error {
466 self.errored = true;
467 if self.last_error_kind.is_none() {
468 self.last_error_kind = Some(err.kind());
469 }
470 if self.last_error_message.is_none() {
471 self.last_error_message = Some(err.to_string());
472 }
473 err
474 }
475
476 #[cfg(feature = "hash")]
477 fn hash_block(&mut self, uncompressed_data: &[u8]) {
478 self.hasher.write(uncompressed_data);
479 }
480
481 #[cfg(not(feature = "hash"))]
482 fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
483}
484
485impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
486 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
487 self.ensure_open()?;
488 if buf.is_empty() {
489 return Ok(0);
490 }
491
492 if let Some(pledged) = self.pledged_content_size
496 && self.bytes_consumed >= pledged
497 {
498 return Err(invalid_input_error(
499 "write would exceed pledged content size",
500 ));
501 }
502
503 self.ensure_frame_started()?;
504
505 let buf = if let Some(pledged) = self.pledged_content_size {
509 let remaining_allowed = pledged
510 .checked_sub(self.bytes_consumed)
511 .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
512 if remaining_allowed == 0 {
513 return Err(invalid_input_error(
514 "write would exceed pledged content size",
515 ));
516 }
517 let accepted = core::cmp::min(
518 buf.len(),
519 usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
520 );
521 &buf[..accepted]
522 } else {
523 buf
524 };
525
526 let block_capacity = self.block_capacity();
527 if self.pending.capacity() == 0 {
528 self.pending = self.allocate_pending_space(block_capacity);
529 }
530 let mut remaining = buf;
531 let mut consumed = 0usize;
532
533 while !remaining.is_empty() {
534 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
535 return result;
536 }
537
538 let available = block_capacity - self.pending.len();
539 let to_take = core::cmp::min(remaining.len(), available);
540 if to_take == 0 {
541 break;
542 }
543 self.pending.extend_from_slice(&remaining[..to_take]);
544 remaining = &remaining[to_take..];
545 consumed += to_take;
546
547 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
548 if let Ok(n) = &result {
549 self.bytes_consumed += *n as u64;
550 }
551 return result;
552 }
553 }
554 self.bytes_consumed += consumed as u64;
555 Ok(consumed)
556 }
557
558 fn flush(&mut self) -> Result<(), Error> {
559 self.ensure_open()?;
560 if self.pending.is_empty() {
561 return self
562 .drain_mut()
563 .and_then(|drain| drain.flush())
564 .map_err(|err| self.fail(err));
565 }
566 self.ensure_frame_started()?;
567 self.emit_pending_block(false)?;
568 self.drain_mut()
569 .and_then(|drain| drain.flush())
570 .map_err(|err| self.fail(err))
571 }
572}
573
574fn error_from_kind(kind: ErrorKind) -> Error {
575 Error::from(kind)
576}
577
578fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
579 #[cfg(feature = "std")]
580 {
581 Error::new(kind, message)
582 }
583 #[cfg(not(feature = "std"))]
584 {
585 Error::new(kind, alloc::boxed::Box::new(message))
586 }
587}
588
589fn invalid_input_error(message: &str) -> Error {
590 #[cfg(feature = "std")]
591 {
592 Error::new(ErrorKind::InvalidInput, message)
593 }
594 #[cfg(not(feature = "std"))]
595 {
596 Error::new(
597 ErrorKind::Other,
598 alloc::boxed::Box::new(alloc::string::String::from(message)),
599 )
600 }
601}
602
603fn other_error_owned(message: String) -> Error {
604 #[cfg(feature = "std")]
605 {
606 Error::other(message)
607 }
608 #[cfg(not(feature = "std"))]
609 {
610 Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
611 }
612}
613
614fn other_error(message: &str) -> Error {
615 #[cfg(feature = "std")]
616 {
617 Error::other(message)
618 }
619 #[cfg(not(feature = "std"))]
620 {
621 Error::new(
622 ErrorKind::Other,
623 alloc::boxed::Box::new(alloc::string::String::from(message)),
624 )
625 }
626}
627
628#[cfg(test)]
629mod tests {
630 use crate::decoding::StreamingDecoder;
631 use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
632 use crate::io::{Error, ErrorKind, Read, Write};
633 use alloc::vec;
634 use alloc::vec::Vec;
635
636 struct TinyMatcher {
637 last_space: Vec<u8>,
638 window_size: u64,
639 }
640
641 impl TinyMatcher {
642 fn new(window_size: u64) -> Self {
643 Self {
644 last_space: Vec::new(),
645 window_size,
646 }
647 }
648 }
649
650 impl Matcher for TinyMatcher {
651 fn get_next_space(&mut self) -> Vec<u8> {
652 vec![0; self.window_size as usize]
653 }
654
655 fn get_last_space(&mut self) -> &[u8] {
656 self.last_space.as_slice()
657 }
658
659 fn commit_space(&mut self, space: Vec<u8>) {
660 self.last_space = space;
661 }
662
663 fn skip_matching(&mut self) {}
664
665 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
666 handle_sequence(Sequence::Literals {
667 literals: self.last_space.as_slice(),
668 });
669 }
670
671 fn reset(&mut self, _level: CompressionLevel) {
672 self.last_space.clear();
673 }
674
675 fn window_size(&self) -> u64 {
676 self.window_size
677 }
678 }
679
680 struct FailingWriteOnce {
681 writes: usize,
682 fail_on_write_number: usize,
683 sink: Vec<u8>,
684 }
685
686 impl FailingWriteOnce {
687 fn new(fail_on_write_number: usize) -> Self {
688 Self {
689 writes: 0,
690 fail_on_write_number,
691 sink: Vec::new(),
692 }
693 }
694 }
695
696 impl Write for FailingWriteOnce {
697 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
698 self.writes += 1;
699 if self.writes == self.fail_on_write_number {
700 return Err(super::other_error("injected write failure"));
701 }
702 self.sink.extend_from_slice(buf);
703 Ok(buf.len())
704 }
705
706 fn flush(&mut self) -> Result<(), Error> {
707 Ok(())
708 }
709 }
710
711 struct FailingWithKind {
712 writes: usize,
713 fail_on_write_number: usize,
714 kind: ErrorKind,
715 }
716
717 impl FailingWithKind {
718 fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
719 Self {
720 writes: 0,
721 fail_on_write_number,
722 kind,
723 }
724 }
725 }
726
727 impl Write for FailingWithKind {
728 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
729 self.writes += 1;
730 if self.writes == self.fail_on_write_number {
731 return Err(Error::from(self.kind));
732 }
733 Ok(buf.len())
734 }
735
736 fn flush(&mut self) -> Result<(), Error> {
737 Ok(())
738 }
739 }
740
741 struct PartialThenFailWriter {
742 writes: usize,
743 fail_on_write_number: usize,
744 partial_prefix_len: usize,
745 terminal_failure: bool,
746 sink: Vec<u8>,
747 }
748
749 impl PartialThenFailWriter {
750 fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
751 Self {
752 writes: 0,
753 fail_on_write_number,
754 partial_prefix_len,
755 terminal_failure: false,
756 sink: Vec::new(),
757 }
758 }
759 }
760
761 impl Write for PartialThenFailWriter {
762 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
763 if self.terminal_failure {
764 return Err(super::other_error("injected terminal write failure"));
765 }
766
767 self.writes += 1;
768 if self.writes == self.fail_on_write_number {
769 let written = core::cmp::min(self.partial_prefix_len, buf.len());
770 if written > 0 {
771 self.sink.extend_from_slice(&buf[..written]);
772 self.terminal_failure = true;
773 return Ok(written);
774 }
775 return Err(super::other_error("injected terminal write failure"));
776 }
777
778 self.sink.extend_from_slice(buf);
779 Ok(buf.len())
780 }
781
782 fn flush(&mut self) -> Result<(), Error> {
783 Ok(())
784 }
785 }
786
787 #[test]
791 fn streaming_encoder_set_magicless_before_write_omits_magic_and_roundtrips() {
792 use crate::common::MAGIC_NUM;
793 let payload = b"streaming-magicless-roundtrip-".repeat(64);
794
795 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
796 encoder
797 .set_magicless(true)
798 .expect("set_magicless pre-write");
799 encoder.write_all(&payload).unwrap();
800 let compressed = encoder.finish().unwrap();
801
802 assert!(
803 !compressed.starts_with(&MAGIC_NUM.to_le_bytes()),
804 "magicless frame must omit the 4-byte magic prefix",
805 );
806
807 let mut decoder = crate::decoding::FrameDecoder::new();
808 decoder.set_magicless(true);
809 let mut cursor: &[u8] = compressed.as_slice();
810 decoder.init(&mut cursor).expect("magicless init");
811 decoder
812 .decode_blocks(&mut cursor, crate::decoding::BlockDecodingStrategy::All)
813 .expect("decode_blocks");
814 let mut decoded: Vec<u8> = Vec::new();
815 decoder
816 .collect_to_writer(&mut decoded)
817 .expect("collect_to_writer");
818 assert_eq!(decoded, payload);
819 }
820
821 #[test]
826 fn streaming_encoder_set_magicless_after_first_write_errors() {
827 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
828 encoder.write_all(b"first-block").unwrap();
829 let err = encoder
830 .set_magicless(true)
831 .expect_err("set_magicless after first write must error");
832 assert_eq!(
833 err.kind(),
834 crate::io::ErrorKind::InvalidInput,
835 "expected InvalidInput when setting magicless after frame_started, got {err:?}",
836 );
837 }
838
839 #[test]
840 fn streaming_encoder_roundtrip_multiple_writes() {
841 let payload = b"streaming-encoder-roundtrip-".repeat(1024);
842 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
843 for chunk in payload.chunks(313) {
844 encoder.write_all(chunk).unwrap();
845 }
846 let compressed = encoder.finish().unwrap();
847
848 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
849 let mut decoded = Vec::new();
850 decoder.read_to_end(&mut decoded).unwrap();
851 assert_eq!(decoded, payload);
852 }
853
854 #[test]
855 fn flush_emits_nonempty_partial_output() {
856 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
857 encoder.write_all(b"partial-block").unwrap();
858 encoder.flush().unwrap();
859 let flushed_len = encoder.get_ref().len();
860 assert!(
861 flushed_len > 0,
862 "flush should emit header+partial block bytes"
863 );
864 let compressed = encoder.finish().unwrap();
865 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
866 let mut decoded = Vec::new();
867 decoder.read_to_end(&mut decoded).unwrap();
868 assert_eq!(decoded, b"partial-block");
869 }
870
871 #[test]
872 fn flush_without_writes_does_not_emit_frame_header() {
873 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
874 encoder.flush().unwrap();
875 assert!(encoder.get_ref().is_empty());
876 }
877
878 #[test]
879 fn block_boundary_write_emits_block_in_same_call() {
880 let mut boundary = StreamingEncoder::new_with_matcher(
881 TinyMatcher::new(4),
882 Vec::new(),
883 CompressionLevel::Uncompressed,
884 );
885 let mut below = StreamingEncoder::new_with_matcher(
886 TinyMatcher::new(4),
887 Vec::new(),
888 CompressionLevel::Uncompressed,
889 );
890
891 boundary.write_all(b"ABCD").unwrap();
892 below.write_all(b"ABC").unwrap();
893
894 let boundary_len = boundary.get_ref().len();
895 let below_len = below.get_ref().len();
896 assert!(
897 boundary_len > below_len,
898 "full block should be emitted immediately at block boundary"
899 );
900 }
901
902 #[test]
903 fn finish_consumes_encoder_and_emits_frame() {
904 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
905 encoder.write_all(b"abc").unwrap();
906 let compressed = encoder.finish().unwrap();
907 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
908 let mut decoded = Vec::new();
909 decoder.read_to_end(&mut decoded).unwrap();
910 assert_eq!(decoded, b"abc");
911 }
912
913 #[test]
914 fn finish_without_writes_emits_empty_frame() {
915 let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
916 let compressed = encoder.finish().unwrap();
917 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
918 let mut decoded = Vec::new();
919 decoder.read_to_end(&mut decoded).unwrap();
920 assert!(decoded.is_empty());
921 }
922
923 #[test]
924 fn write_empty_buffer_returns_zero() {
925 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
926 assert_eq!(encoder.write(&[]).unwrap(), 0);
927 let _ = encoder.finish().unwrap();
928 }
929
930 #[test]
931 fn uncompressed_level_roundtrip() {
932 let payload = b"uncompressed-streaming-roundtrip".repeat(64);
933 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
934 for chunk in payload.chunks(41) {
935 encoder.write_all(chunk).unwrap();
936 }
937 let compressed = encoder.finish().unwrap();
938 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
939 let mut decoded = Vec::new();
940 decoder.read_to_end(&mut decoded).unwrap();
941 assert_eq!(decoded, payload);
942 }
943
944 #[test]
945 fn better_level_streaming_roundtrip() {
946 let payload = b"better-level-streaming-test".repeat(256);
947 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
948 for chunk in payload.chunks(53) {
949 encoder.write_all(chunk).unwrap();
950 }
951 let compressed = encoder.finish().unwrap();
952 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
953 let mut decoded = Vec::new();
954 decoder.read_to_end(&mut decoded).unwrap();
955 assert_eq!(decoded, payload);
956 }
957
958 #[test]
959 fn zero_window_matcher_returns_invalid_input_error() {
960 let mut encoder = StreamingEncoder::new_with_matcher(
961 TinyMatcher::new(0),
962 Vec::new(),
963 CompressionLevel::Fastest,
964 );
965 let err = encoder.write_all(b"payload").unwrap_err();
966 assert_eq!(err.kind(), ErrorKind::InvalidInput);
967 }
968
969 #[test]
970 fn best_level_streaming_roundtrip() {
971 let payload = b"best-level-streaming-test".repeat(8 * 1024);
974 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
975 for chunk in payload.chunks(53) {
976 encoder.write_all(chunk).unwrap();
977 }
978 let compressed = encoder.finish().unwrap();
979 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
980 let mut decoded = Vec::new();
981 decoder.read_to_end(&mut decoded).unwrap();
982 assert_eq!(decoded, payload);
983 }
984
985 #[test]
986 fn write_failure_poisoning_is_sticky() {
987 let mut encoder = StreamingEncoder::new_with_matcher(
988 TinyMatcher::new(4),
989 FailingWriteOnce::new(1),
990 CompressionLevel::Uncompressed,
991 );
992
993 assert!(encoder.write_all(b"ABCD").is_err());
994 assert!(encoder.flush().is_err());
995 assert!(encoder.write_all(b"EFGH").is_err());
996 assert_eq!(encoder.get_ref().sink.len(), 0);
997 assert!(encoder.finish().is_err());
998 }
999
1000 #[test]
1001 fn poisoned_encoder_returns_original_error_kind() {
1002 let mut encoder = StreamingEncoder::new_with_matcher(
1003 TinyMatcher::new(4),
1004 FailingWithKind::new(1, ErrorKind::BrokenPipe),
1005 CompressionLevel::Uncompressed,
1006 );
1007
1008 let first_error = encoder.write_all(b"ABCD").unwrap_err();
1009 assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
1010
1011 let second_error = encoder.write_all(b"EFGH").unwrap_err();
1012 assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
1013 }
1014
1015 #[test]
1016 fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
1017 let payload = b"ABCDEFGHIJKL";
1018 let mut encoder = StreamingEncoder::new_with_matcher(
1019 TinyMatcher::new(4),
1020 FailingWriteOnce::new(3),
1021 CompressionLevel::Uncompressed,
1022 );
1023
1024 let first_write = encoder.write(payload).unwrap();
1025 assert_eq!(first_write, 8);
1026 assert!(encoder.write(&payload[first_write..]).is_err());
1027 assert!(encoder.flush().is_err());
1028 assert!(encoder.write_all(b"EFGH").is_err());
1029 }
1030
1031 #[test]
1032 fn partial_write_failure_after_progress_poisons_encoder() {
1033 let payload = b"ABCDEFGHIJKL";
1034 let mut encoder = StreamingEncoder::new_with_matcher(
1035 TinyMatcher::new(4),
1036 PartialThenFailWriter::new(3, 1),
1037 CompressionLevel::Uncompressed,
1038 );
1039
1040 let first_write = encoder.write(payload).unwrap();
1041 assert_eq!(first_write, 8);
1042
1043 let second_write = encoder.write(&payload[first_write..]);
1044 assert!(second_write.is_err());
1045 assert!(encoder.flush().is_err());
1046 assert!(encoder.write_all(b"MNOP").is_err());
1047 }
1048
1049 #[test]
1050 fn new_with_matcher_and_get_mut_work() {
1051 let matcher = TinyMatcher::new(128 * 1024);
1052 let mut encoder =
1053 StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
1054 encoder.get_mut().extend_from_slice(b"");
1055 encoder.write_all(b"custom-matcher").unwrap();
1056 let compressed = encoder.finish().unwrap();
1057 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1058 let mut decoded = Vec::new();
1059 decoder.read_to_end(&mut decoded).unwrap();
1060 assert_eq!(decoded, b"custom-matcher");
1061 }
1062
1063 #[cfg(feature = "std")]
1064 #[test]
1065 fn streaming_encoder_output_decompresses_with_c_zstd() {
1066 let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
1067 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1068 for chunk in payload.chunks(1024) {
1069 encoder.write_all(chunk).unwrap();
1070 }
1071 let compressed = encoder.finish().unwrap();
1072
1073 let mut decoded = Vec::with_capacity(payload.len());
1074 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1075 assert_eq!(decoded, payload);
1076 }
1077
1078 #[test]
1079 fn pledged_content_size_written_in_header() {
1080 let payload = b"hello world, pledged size test";
1081 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1082 encoder
1083 .set_pledged_content_size(payload.len() as u64)
1084 .unwrap();
1085 encoder.write_all(payload).unwrap();
1086 let compressed = encoder.finish().unwrap();
1087
1088 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1090 .unwrap()
1091 .0;
1092 assert_eq!(header.frame_content_size(), payload.len() as u64);
1093
1094 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1096 let mut decoded = Vec::new();
1097 decoder.read_to_end(&mut decoded).unwrap();
1098 assert_eq!(decoded, payload);
1099 }
1100
1101 #[test]
1102 fn pledged_content_size_mismatch_returns_error() {
1103 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1104 encoder.set_pledged_content_size(100).unwrap();
1105 encoder.write_all(b"short payload").unwrap(); let err = encoder.finish().unwrap_err();
1107 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1108 }
1109
1110 #[test]
1111 fn write_exceeding_pledge_returns_error() {
1112 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1113 encoder.set_pledged_content_size(5).unwrap();
1114 let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
1115 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1116 }
1117
1118 #[test]
1119 fn write_straddling_pledge_reports_partial_progress() {
1120 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1121 encoder.set_pledged_content_size(5).unwrap();
1122 assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
1124 let err = encoder.write(b"g").unwrap_err();
1126 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1127 }
1128
1129 #[test]
1130 fn encoded_scratch_capacity_is_reused_across_blocks() {
1131 let payload = vec![0xAB; 64 * 3];
1132 let mut encoder = StreamingEncoder::new_with_matcher(
1133 TinyMatcher::new(64),
1134 Vec::new(),
1135 CompressionLevel::Uncompressed,
1136 );
1137
1138 encoder.write_all(&payload[..64]).unwrap();
1139 let first_capacity = encoder.encoded_scratch.capacity();
1140 assert!(
1141 first_capacity >= 67,
1142 "expected encoded scratch to keep block header + payload capacity",
1143 );
1144
1145 encoder.write_all(&payload[64..128]).unwrap();
1146 let second_capacity = encoder.encoded_scratch.capacity();
1147 assert!(
1148 second_capacity >= first_capacity,
1149 "encoded scratch capacity should be reused across block emits",
1150 );
1151
1152 encoder.write_all(&payload[128..]).unwrap();
1153 let compressed = encoder.finish().unwrap();
1154 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1155 let mut decoded = Vec::new();
1156 decoder.read_to_end(&mut decoded).unwrap();
1157 assert_eq!(decoded, payload);
1158 }
1159
1160 #[test]
1161 fn pledged_content_size_after_write_returns_error() {
1162 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1163 encoder.write_all(b"already writing").unwrap();
1164 let err = encoder.set_pledged_content_size(15).unwrap_err();
1165 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1166 }
1167
1168 #[test]
1169 fn source_size_hint_directly_reduces_window_header() {
1170 let payload = b"streaming-source-size-hint".repeat(64);
1171
1172 let mut no_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1173 no_hint.write_all(payload.as_slice()).unwrap();
1174 let no_hint_frame = no_hint.finish().unwrap();
1175 let no_hint_header = crate::decoding::frame::read_frame_header(no_hint_frame.as_slice())
1176 .unwrap()
1177 .0;
1178 let no_hint_window = no_hint_header.window_size().unwrap();
1179
1180 let mut with_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1181 with_hint
1182 .set_source_size_hint(payload.len() as u64)
1183 .unwrap();
1184 with_hint.write_all(payload.as_slice()).unwrap();
1185 let late_hint_err = with_hint
1186 .set_source_size_hint(payload.len() as u64)
1187 .unwrap_err();
1188 assert_eq!(late_hint_err.kind(), ErrorKind::InvalidInput);
1189 let with_hint_frame = with_hint.finish().unwrap();
1190 let with_hint_header =
1191 crate::decoding::frame::read_frame_header(with_hint_frame.as_slice())
1192 .unwrap()
1193 .0;
1194 let with_hint_window = with_hint_header.window_size().unwrap();
1195
1196 assert!(
1197 with_hint_window <= no_hint_window,
1198 "source size hint should not increase advertised window"
1199 );
1200
1201 let mut decoder = StreamingDecoder::new(with_hint_frame.as_slice()).unwrap();
1202 let mut decoded = Vec::new();
1203 decoder.read_to_end(&mut decoded).unwrap();
1204 assert_eq!(decoded, payload);
1205 }
1206
1207 #[cfg(feature = "std")]
1208 #[test]
1209 fn pledged_content_size_c_zstd_compatible() {
1210 let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
1211 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1212 encoder
1213 .set_pledged_content_size(payload.len() as u64)
1214 .unwrap();
1215 for chunk in payload.chunks(1024) {
1216 encoder.write_all(chunk).unwrap();
1217 }
1218 let compressed = encoder.finish().unwrap();
1219
1220 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1222 .unwrap()
1223 .0;
1224 assert_eq!(header.frame_content_size(), payload.len() as u64);
1225
1226 let mut decoded = Vec::new();
1228 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1229 assert_eq!(decoded, payload);
1230 }
1231
1232 #[test]
1233 fn single_segment_requires_pledged_to_fit_matcher_window() {
1234 let payload = b"streaming-window-gate-".repeat(60); let mut encoder = StreamingEncoder::new_with_matcher(
1236 TinyMatcher::new(1024),
1237 Vec::new(),
1238 CompressionLevel::Fastest,
1239 );
1240 encoder
1241 .set_pledged_content_size(payload.len() as u64)
1242 .unwrap();
1243 encoder.write_all(payload.as_slice()).unwrap();
1244 let compressed = encoder.finish().unwrap();
1245
1246 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1247 .unwrap()
1248 .0;
1249 assert_eq!(header.frame_content_size(), payload.len() as u64);
1250 assert!(
1251 !header.descriptor.single_segment_flag(),
1252 "single-segment must stay off when pledged content size exceeds matcher window"
1253 );
1254 assert!(
1255 header.window_size().unwrap() >= 1024,
1256 "window descriptor should be present when single-segment is disabled"
1257 );
1258 }
1259
1260 #[test]
1261 fn ensure_frame_started_refreshes_stale_strategy_tag_at_reset() {
1262 use crate::encoding::strategy::StrategyTag;
1278 for level in [
1279 CompressionLevel::Fastest,
1280 CompressionLevel::Default,
1281 CompressionLevel::Better,
1282 CompressionLevel::Best,
1283 ] {
1284 let expected = StrategyTag::for_compression_level(level);
1285 let mut encoder = StreamingEncoder::new(Vec::new(), level);
1286 let sentinel = StrategyTag::BtUltra2;
1291 assert_ne!(
1292 expected, sentinel,
1293 "sentinel must differ from the legitimate tag at level {level:?}",
1294 );
1295 encoder.state.strategy_tag = sentinel;
1296 encoder.write_all(b"x").unwrap();
1297 assert_eq!(
1298 encoder.state.strategy_tag, expected,
1299 "reset-time strategy_tag sync missing at level {level:?}: \
1300 sentinel survived `ensure_frame_started`",
1301 );
1302 let _ = encoder.finish().unwrap();
1303 }
1304 }
1305
1306 #[test]
1307 fn no_pledged_size_omits_fcs_from_header() {
1308 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1309 encoder.write_all(b"no pledged size").unwrap();
1310 let compressed = encoder.finish().unwrap();
1311
1312 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1314 .unwrap()
1315 .0;
1316 assert_eq!(header.frame_content_size(), 0);
1317 assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1320 }
1321}