1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14 CompressionLevel, EncoderDictionary, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15 frame_compressor::CachedDictionaryEntropy, frame_compressor::CompressState,
16 frame_compressor::FseTables, frame_compressor::PreviousFseTable, frame_header::FrameHeader,
17};
18use crate::io::{Error, ErrorKind, Write};
19
20pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
26 drain: Option<W>,
27 compression_level: CompressionLevel,
28 state: CompressState<M>,
29 pending: Vec<u8>,
30 encoded_scratch: Vec<u8>,
31 errored: bool,
32 last_error_kind: Option<ErrorKind>,
33 last_error_message: Option<String>,
34 frame_started: bool,
35 target_block_size: Option<u32>,
39 pledged_content_size: Option<u64>,
40 content_size_flag: bool,
46 bytes_consumed: u64,
47 strategy_override: Option<crate::encoding::strategy::StrategyTag>,
53 magicless: bool,
56 content_checksum: bool,
62 dictionary: Option<EncoderDictionary>,
65 dictionary_id_flag: bool,
71 dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
75 #[cfg(feature = "hash")]
76 hasher: XxHash64,
77}
78
79impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
80 pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
85 Self::new_with_matcher(
86 MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
87 drain,
88 compression_level,
89 )
90 }
91
92 pub fn set_parameters(
100 &mut self,
101 params: &crate::encoding::CompressionParameters,
102 ) -> Result<(), Error> {
103 self.ensure_open()?;
104 if self.frame_started {
105 return Err(invalid_input_error(
106 "compression parameters must be set before the first write",
107 ));
108 }
109 self.compression_level = params.level();
110 let overrides = params.overrides();
111 self.strategy_override = overrides.strategy.map(|s| s.tag());
114 self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
115 crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
116 });
117 self.state.matcher.set_param_overrides(Some(overrides));
118 Ok(())
119 }
120}
121
122impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
123 pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
128 Self {
129 drain: Some(drain),
130 compression_level,
131 state: CompressState {
132 matcher,
133 last_huff_table: None,
134 huff_table_spare: None,
135 fse_tables: FseTables::new(),
136 block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
137 offset_hist: [1, 4, 8],
138 strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
139 compression_level,
140 ),
141 },
142 pending: Vec::new(),
143 encoded_scratch: Vec::new(),
144 errored: false,
145 last_error_kind: None,
146 last_error_message: None,
147 frame_started: false,
148 target_block_size: None,
149 pledged_content_size: None,
150 content_size_flag: true,
151 bytes_consumed: 0,
152 strategy_override: None,
153 magicless: false,
154 content_checksum: false,
155 dictionary: None,
156 dictionary_id_flag: true,
157 dictionary_entropy_cache: None,
158 #[cfg(feature = "hash")]
159 hasher: XxHash64::with_seed(0),
160 }
161 }
162
163 pub fn set_target_block_size(&mut self, target: Option<u32>) -> Result<(), Error> {
172 self.ensure_open()?;
173 if self.frame_started {
174 return Err(invalid_input_error(
175 "the block-size target must be set before the first write",
176 ));
177 }
178 self.target_block_size = target.map(|t| {
179 t.clamp(
180 crate::common::MIN_TARGET_BLOCK_SIZE,
181 crate::common::MAX_BLOCK_SIZE,
182 )
183 });
184 Ok(())
185 }
186
187 pub fn set_content_checksum(&mut self, emit: bool) -> Result<(), Error> {
195 self.ensure_open()?;
196 if self.frame_started {
197 return Err(invalid_input_error(
198 "content checksum must be set before the first write",
199 ));
200 }
201 self.content_checksum = emit;
202 Ok(())
203 }
204
205 pub fn set_magicless(&mut self, magicless: bool) -> Result<(), Error> {
213 self.ensure_open()?;
214 if self.frame_started {
215 return Err(invalid_input_error(
216 "magicless format must be set before the first write",
217 ));
218 }
219 self.magicless = magicless;
220 Ok(())
221 }
222
223 pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
234 self.ensure_open()?;
235 if self.frame_started {
236 return Err(invalid_input_error(
237 "pledged content size must be set before the first write",
238 ));
239 }
240 self.pledged_content_size = Some(size);
241 self.state.matcher.set_source_size_hint(size);
244 Ok(())
245 }
246
247 pub fn set_content_size_flag(&mut self, emit: bool) -> Result<(), Error> {
254 self.ensure_open()?;
255 if self.frame_started {
256 return Err(invalid_input_error(
257 "content size flag must be set before the first write",
258 ));
259 }
260 self.content_size_flag = emit;
261 Ok(())
262 }
263
264 pub fn set_source_size_hint(&mut self, size: u64) -> Result<(), Error> {
272 self.ensure_open()?;
273 if self.frame_started {
274 return Err(invalid_input_error(
275 "source size hint must be set before the first write",
276 ));
277 }
278 self.state.matcher.set_source_size_hint(size);
279 Ok(())
280 }
281
282 pub fn set_dictionary_from_bytes(&mut self, raw_dictionary: &[u8]) -> Result<(), Error> {
289 let dict = EncoderDictionary::from_bytes(raw_dictionary)
290 .map_err(|err| invalid_input_error(&alloc::format!("invalid dictionary: {err:?}")))?;
291 self.set_encoder_dictionary(dict)
292 }
293
294 pub fn set_dictionary_id_flag(&mut self, emit: bool) -> Result<(), Error> {
299 self.ensure_open()?;
300 if self.frame_started {
301 return Err(invalid_input_error(
302 "dictionary ID flag must be set before the first write",
303 ));
304 }
305 self.dictionary_id_flag = emit;
306 Ok(())
307 }
308
309 pub fn set_encoder_dictionary(&mut self, dict: EncoderDictionary) -> Result<(), Error> {
313 self.ensure_open()?;
314 if self.frame_started {
315 return Err(invalid_input_error(
316 "dictionary must be attached before the first write",
317 ));
318 }
319 let inner = &dict.inner;
320 if inner.id == 0 {
321 return Err(invalid_input_error("dictionary has a zero ID"));
322 }
323 if inner.offset_hist.contains(&0) {
324 return Err(invalid_input_error(
325 "dictionary carries a zero repeat offset",
326 ));
327 }
328 self.dictionary_entropy_cache = Some(CachedDictionaryEntropy::from_dictionary(inner));
329 self.dictionary = Some(dict);
330 Ok(())
331 }
332
333 pub fn get_ref(&self) -> &W {
338 self.drain
339 .as_ref()
340 .expect("streaming encoder drain is present until finish consumes self")
341 }
342
343 pub fn heap_size(&self) -> usize {
352 let mut total = self.state.matcher.heap_size();
353 total += self
354 .state
355 .last_huff_table
356 .as_ref()
357 .map_or(0, |table| table.heap_size());
358 total += self
359 .state
360 .huff_table_spare
361 .as_ref()
362 .map_or(0, |table| table.heap_size());
363 total += self.pending.capacity();
364 total += self.encoded_scratch.capacity();
365 total += self
366 .dictionary
367 .as_ref()
368 .map_or(0, |d| d.inner.dict_content.capacity());
369 total += self
370 .dictionary_entropy_cache
371 .as_ref()
372 .map_or(0, CachedDictionaryEntropy::heap_size);
373 total
374 }
375
376 pub fn get_mut(&mut self) -> &mut W {
384 self.drain
385 .as_mut()
386 .expect("streaming encoder drain is present until finish consumes self")
387 }
388
389 pub fn finish(mut self) -> Result<W, Error> {
394 self.ensure_open()?;
395
396 if let Some(pledged) = self.pledged_content_size
400 && self.bytes_consumed != pledged
401 {
402 return Err(invalid_input_error(
403 "pledged content size does not match bytes consumed",
404 ));
405 }
406
407 self.ensure_frame_started()?;
408
409 if self.pending.is_empty() {
410 self.write_empty_last_block()
411 .map_err(|err| self.fail(err))?;
412 } else {
413 self.emit_pending_block(true)?;
414 }
415
416 let mut drain = self
417 .drain
418 .take()
419 .expect("streaming encoder drain must be present when finishing");
420
421 #[cfg(feature = "hash")]
422 if self.content_checksum {
423 let checksum = self.hasher.finish() as u32;
424 drain
425 .write_all(&checksum.to_le_bytes())
426 .map_err(|err| self.fail(err))?;
427 }
428
429 drain.flush().map_err(|err| self.fail(err))?;
430 Ok(drain)
431 }
432
433 fn ensure_open(&self) -> Result<(), Error> {
434 if self.errored {
435 return Err(self.sticky_error());
436 }
437 Ok(())
438 }
439
440 fn sticky_error(&self) -> Error {
444 match (self.last_error_kind, self.last_error_message.as_deref()) {
445 (Some(kind), Some(message)) => error_with_kind_message(
446 kind,
447 format!(
448 "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
449 ),
450 ),
451 (Some(kind), None) => error_from_kind(kind),
452 (None, Some(message)) => other_error_owned(format!(
453 "streaming encoder is in an errored state: {message}"
454 )),
455 (None, None) => other_error("streaming encoder is in an errored state"),
456 }
457 }
458
459 fn drain_mut(&mut self) -> Result<&mut W, Error> {
460 self.drain
461 .as_mut()
462 .ok_or_else(|| other_error("streaming encoder has no active drain"))
463 }
464
465 fn ensure_frame_started(&mut self) -> Result<(), Error> {
466 if self.frame_started {
467 return Ok(());
468 }
469
470 self.ensure_level_supported()?;
471 let use_dictionary_state =
479 !matches!(self.compression_level, CompressionLevel::Uncompressed)
480 && self.state.matcher.supports_dictionary_priming()
481 && self.dictionary.is_some();
482 if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
485 self.state
486 .matcher
487 .set_dictionary_size_hint(dict.inner.dict_content.len());
488 }
489 self.state.matcher.reset(self.compression_level);
490 self.state.offset_hist = if use_dictionary_state {
493 self.dictionary
494 .as_ref()
495 .map(|dict| dict.inner.offset_hist)
496 .unwrap_or([1, 4, 8])
497 } else {
498 [1, 4, 8]
499 };
500 if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
505 let offset_hist = dict.inner.offset_hist;
506 self.state
507 .matcher
508 .prime_with_dictionary(dict.inner.dict_content.as_slice(), offset_hist);
509 }
510 if use_dictionary_state && let Some(cache) = self.dictionary_entropy_cache.as_ref() {
513 self.state.last_huff_table.clone_from(&cache.huff);
514 self.state
515 .fse_tables
516 .ll_previous
517 .clone_from(&cache.ll_previous);
518 self.state
519 .fse_tables
520 .ml_previous
521 .clone_from(&cache.ml_previous);
522 self.state
523 .fse_tables
524 .of_previous
525 .clone_from(&cache.of_previous);
526 let ll_entropy = match cache.ll_previous.as_ref() {
527 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
528 _ => None,
529 };
530 let ml_entropy = match cache.ml_previous.as_ref() {
531 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
532 _ => None,
533 };
534 let of_entropy = match cache.of_previous.as_ref() {
535 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
536 _ => None,
537 };
538 self.state.matcher.seed_dictionary_entropy(
539 self.state.last_huff_table.as_ref(),
540 ll_entropy,
541 ml_entropy,
542 of_entropy,
543 );
544 } else {
545 self.state.last_huff_table = None;
546 self.state.fse_tables.ll_previous = None;
547 self.state.fse_tables.ml_previous = None;
548 self.state.fse_tables.of_previous = None;
549 }
550 self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
558 crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
559 });
560 #[cfg(feature = "hash")]
561 {
562 self.hasher = XxHash64::with_seed(0);
563 }
564
565 let window_size = self.state.matcher.window_size();
566 if window_size == 0 {
567 return Err(invalid_input_error(
568 "matcher reported window_size == 0, which is invalid",
569 ));
570 }
571
572 let single_segment = self.content_size_flag
581 && !use_dictionary_state
582 && self
583 .pledged_content_size
584 .map(|size| (512..=(1 << 14)).contains(&size) && size <= window_size)
585 .unwrap_or(false);
586
587 let header = FrameHeader {
588 frame_content_size: if self.content_size_flag {
589 self.pledged_content_size
590 } else {
591 None
592 },
593 single_segment,
594 content_checksum: cfg!(feature = "hash") && self.content_checksum,
595 dictionary_id: if use_dictionary_state && self.dictionary_id_flag {
596 self.dictionary.as_ref().map(|dict| dict.inner.id as u64)
597 } else {
598 None
599 },
600 window_size: if single_segment {
601 None
602 } else {
603 Some(window_size)
604 },
605 magicless: self.magicless,
606 };
607 let mut encoded_header = Vec::new();
608 header.serialize(&mut encoded_header);
609 self.drain_mut()
610 .and_then(|drain| drain.write_all(&encoded_header))
611 .map_err(|err| self.fail(err))?;
612
613 self.frame_started = true;
614 Ok(())
615 }
616
617 fn block_capacity(&self) -> usize {
618 let matcher_window = self.state.matcher.window_size() as usize;
619 let ceiling = self
620 .target_block_size
621 .map_or(MAX_BLOCK_SIZE as usize, |t| t as usize);
622 core::cmp::max(1, core::cmp::min(matcher_window, ceiling))
623 }
624
625 fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
626 let mut space = match self.compression_level {
627 CompressionLevel::Fastest
628 | CompressionLevel::Default
629 | CompressionLevel::Better
630 | CompressionLevel::Best
631 | CompressionLevel::Level(_) => self.state.matcher.get_next_space(),
632 CompressionLevel::Uncompressed => Vec::new(),
633 };
634 space.clear();
635 if space.capacity() > block_capacity {
636 space.shrink_to(block_capacity);
637 }
638 if space.capacity() < block_capacity {
639 space.reserve(block_capacity - space.capacity());
640 }
641 space
642 }
643
644 fn emit_full_pending_block(
645 &mut self,
646 block_capacity: usize,
647 consumed: usize,
648 ) -> Option<Result<usize, Error>> {
649 if self.pending.len() != block_capacity {
650 return None;
651 }
652
653 let new_pending = self.allocate_pending_space(block_capacity);
654 let full_block = mem::replace(&mut self.pending, new_pending);
655 if let Err((err, restored_block)) = self.encode_block(full_block, false) {
656 self.pending = restored_block;
657 let err = self.fail(err);
658 if consumed > 0 {
659 return Some(Ok(consumed));
660 }
661 return Some(Err(err));
662 }
663 None
664 }
665
666 fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
667 let block = mem::take(&mut self.pending);
668 if let Err((err, restored_block)) = self.encode_block(block, last_block) {
669 self.pending = restored_block;
670 return Err(self.fail(err));
671 }
672 if !last_block {
673 let block_capacity = self.block_capacity();
674 self.pending = self.allocate_pending_space(block_capacity);
675 }
676 Ok(())
677 }
678
679 fn ensure_level_supported(&self) -> Result<(), Error> {
683 match self.compression_level {
684 CompressionLevel::Uncompressed
685 | CompressionLevel::Fastest
686 | CompressionLevel::Default
687 | CompressionLevel::Better
688 | CompressionLevel::Best
689 | CompressionLevel::Level(_) => Ok(()),
690 }
691 }
692
693 fn encode_block(
694 &mut self,
695 uncompressed_data: Vec<u8>,
696 last_block: bool,
697 ) -> Result<(), (Error, Vec<u8>)> {
698 let mut raw_block = Some(uncompressed_data);
699 let mut encoded = Vec::new();
700 mem::swap(&mut encoded, &mut self.encoded_scratch);
701 encoded.clear();
702 let needed_capacity = self.block_capacity() + 3;
703 if encoded.capacity() < needed_capacity {
704 encoded.reserve(needed_capacity.saturating_sub(encoded.len()));
705 }
706 let mut moved_into_matcher = false;
707 if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
708 let header = BlockHeader {
709 last_block,
710 block_type: crate::blocks::block::BlockType::Raw,
711 block_size: 0,
712 };
713 header.serialize(&mut encoded);
714 } else {
715 match self.compression_level {
716 CompressionLevel::Uncompressed => {
717 let block = raw_block.as_ref().expect("raw block missing");
718 let header = BlockHeader {
719 last_block,
720 block_type: crate::blocks::block::BlockType::Raw,
721 block_size: block.len() as u32,
722 };
723 header.serialize(&mut encoded);
724 encoded.extend_from_slice(block);
725 }
726 CompressionLevel::Fastest
727 | CompressionLevel::Default
728 | CompressionLevel::Better
729 | CompressionLevel::Best
730 | CompressionLevel::Level(_) => {
731 let block = raw_block.take().expect("raw block missing");
732 debug_assert!(!block.is_empty(), "empty blocks handled above");
733 let dict_active = self.dictionary.is_some()
740 && self.state.matcher.supports_dictionary_priming();
741 compress_block_encoded(
742 &mut self.state,
743 self.compression_level,
744 last_block,
745 block,
746 &mut encoded,
747 dict_active,
748 #[cfg(feature = "lsm")]
751 None,
752 #[cfg(all(feature = "lsm", feature = "hash"))]
753 None,
754 );
755 moved_into_matcher = true;
756 }
757 }
758 }
759
760 if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
761 encoded.clear();
762 mem::swap(&mut encoded, &mut self.encoded_scratch);
763 let restored = if moved_into_matcher {
764 self.state.matcher.get_last_space().to_vec()
765 } else {
766 raw_block.unwrap_or_default()
767 };
768 return Err((err, restored));
769 }
770
771 if moved_into_matcher {
772 #[cfg(feature = "hash")]
773 if self.content_checksum {
774 self.hasher.write(self.state.matcher.get_last_space());
775 }
776 } else {
777 self.hash_block(raw_block.as_deref().unwrap_or(&[]));
778 }
779 encoded.clear();
780 mem::swap(&mut encoded, &mut self.encoded_scratch);
781 Ok(())
782 }
783
784 fn write_empty_last_block(&mut self) -> Result<(), Error> {
785 self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
786 }
787
788 fn fail(&mut self, err: Error) -> Error {
789 self.errored = true;
790 if self.last_error_kind.is_none() {
791 self.last_error_kind = Some(err.kind());
792 }
793 if self.last_error_message.is_none() {
794 self.last_error_message = Some(err.to_string());
795 }
796 err
797 }
798
799 #[cfg(feature = "hash")]
800 fn hash_block(&mut self, uncompressed_data: &[u8]) {
801 if self.content_checksum {
802 self.hasher.write(uncompressed_data);
803 }
804 }
805
806 #[cfg(not(feature = "hash"))]
807 fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
808}
809
810impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
811 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
812 self.ensure_open()?;
813 if buf.is_empty() {
814 return Ok(0);
815 }
816
817 if let Some(pledged) = self.pledged_content_size
821 && self.bytes_consumed >= pledged
822 {
823 return Err(invalid_input_error(
824 "write would exceed pledged content size",
825 ));
826 }
827
828 self.ensure_frame_started()?;
829
830 let buf = if let Some(pledged) = self.pledged_content_size {
834 let remaining_allowed = pledged
835 .checked_sub(self.bytes_consumed)
836 .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
837 if remaining_allowed == 0 {
838 return Err(invalid_input_error(
839 "write would exceed pledged content size",
840 ));
841 }
842 let accepted = core::cmp::min(
843 buf.len(),
844 usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
845 );
846 &buf[..accepted]
847 } else {
848 buf
849 };
850
851 let block_capacity = self.block_capacity();
852 if self.pending.capacity() == 0 {
853 self.pending = self.allocate_pending_space(block_capacity);
854 }
855 let mut remaining = buf;
856 let mut consumed = 0usize;
857
858 while !remaining.is_empty() {
859 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
860 return result;
861 }
862
863 let available = block_capacity - self.pending.len();
864 let to_take = core::cmp::min(remaining.len(), available);
865 if to_take == 0 {
866 break;
867 }
868 self.pending.extend_from_slice(&remaining[..to_take]);
869 remaining = &remaining[to_take..];
870 consumed += to_take;
871
872 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
873 if let Ok(n) = &result {
874 self.bytes_consumed += *n as u64;
875 }
876 return result;
877 }
878 }
879 self.bytes_consumed += consumed as u64;
880 Ok(consumed)
881 }
882
883 fn flush(&mut self) -> Result<(), Error> {
884 self.ensure_open()?;
885 if self.pending.is_empty() {
886 return self
887 .drain_mut()
888 .and_then(|drain| drain.flush())
889 .map_err(|err| self.fail(err));
890 }
891 self.ensure_frame_started()?;
892 self.emit_pending_block(false)?;
893 self.drain_mut()
894 .and_then(|drain| drain.flush())
895 .map_err(|err| self.fail(err))
896 }
897}
898
899fn error_from_kind(kind: ErrorKind) -> Error {
900 Error::from(kind)
901}
902
903fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
904 #[cfg(feature = "std")]
905 {
906 Error::new(kind, message)
907 }
908 #[cfg(not(feature = "std"))]
909 {
910 Error::new(kind, alloc::boxed::Box::new(message))
911 }
912}
913
914fn invalid_input_error(message: &str) -> Error {
915 #[cfg(feature = "std")]
916 {
917 Error::new(ErrorKind::InvalidInput, message)
918 }
919 #[cfg(not(feature = "std"))]
920 {
921 Error::new(
922 ErrorKind::Other,
923 alloc::boxed::Box::new(alloc::string::String::from(message)),
924 )
925 }
926}
927
928fn other_error_owned(message: String) -> Error {
929 #[cfg(feature = "std")]
930 {
931 Error::other(message)
932 }
933 #[cfg(not(feature = "std"))]
934 {
935 Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
936 }
937}
938
939fn other_error(message: &str) -> Error {
940 #[cfg(feature = "std")]
941 {
942 Error::other(message)
943 }
944 #[cfg(not(feature = "std"))]
945 {
946 Error::new(
947 ErrorKind::Other,
948 alloc::boxed::Box::new(alloc::string::String::from(message)),
949 )
950 }
951}
952
953#[cfg(test)]
954mod tests {
955 use crate::decoding::StreamingDecoder;
956 use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
957 use crate::io::{Error, ErrorKind, Read, Write};
958 use alloc::vec;
959 use alloc::vec::Vec;
960
961 struct TinyMatcher {
962 last_space: Vec<u8>,
963 window_size: u64,
964 }
965
966 impl TinyMatcher {
967 fn new(window_size: u64) -> Self {
968 Self {
969 last_space: Vec::new(),
970 window_size,
971 }
972 }
973 }
974
975 impl Matcher for TinyMatcher {
976 fn get_next_space(&mut self) -> Vec<u8> {
977 vec![0; self.window_size as usize]
978 }
979
980 fn get_last_space(&mut self) -> &[u8] {
981 self.last_space.as_slice()
982 }
983
984 fn commit_space(&mut self, space: Vec<u8>) {
985 self.last_space = space;
986 }
987
988 fn skip_matching(&mut self) {}
989
990 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
991 handle_sequence(Sequence::Literals {
992 literals: self.last_space.as_slice(),
993 });
994 }
995
996 fn reset(&mut self, _level: CompressionLevel) {
997 self.last_space.clear();
998 }
999
1000 fn window_size(&self) -> u64 {
1001 self.window_size
1002 }
1003 }
1004
1005 struct FailingWriteOnce {
1006 writes: usize,
1007 fail_on_write_number: usize,
1008 sink: Vec<u8>,
1009 }
1010
1011 impl FailingWriteOnce {
1012 fn new(fail_on_write_number: usize) -> Self {
1013 Self {
1014 writes: 0,
1015 fail_on_write_number,
1016 sink: Vec::new(),
1017 }
1018 }
1019 }
1020
1021 impl Write for FailingWriteOnce {
1022 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
1023 self.writes += 1;
1024 if self.writes == self.fail_on_write_number {
1025 return Err(super::other_error("injected write failure"));
1026 }
1027 self.sink.extend_from_slice(buf);
1028 Ok(buf.len())
1029 }
1030
1031 fn flush(&mut self) -> Result<(), Error> {
1032 Ok(())
1033 }
1034 }
1035
1036 struct FailingWithKind {
1037 writes: usize,
1038 fail_on_write_number: usize,
1039 kind: ErrorKind,
1040 }
1041
1042 impl FailingWithKind {
1043 fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
1044 Self {
1045 writes: 0,
1046 fail_on_write_number,
1047 kind,
1048 }
1049 }
1050 }
1051
1052 impl Write for FailingWithKind {
1053 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
1054 self.writes += 1;
1055 if self.writes == self.fail_on_write_number {
1056 return Err(Error::from(self.kind));
1057 }
1058 Ok(buf.len())
1059 }
1060
1061 fn flush(&mut self) -> Result<(), Error> {
1062 Ok(())
1063 }
1064 }
1065
1066 struct PartialThenFailWriter {
1067 writes: usize,
1068 fail_on_write_number: usize,
1069 partial_prefix_len: usize,
1070 terminal_failure: bool,
1071 sink: Vec<u8>,
1072 }
1073
1074 impl PartialThenFailWriter {
1075 fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
1076 Self {
1077 writes: 0,
1078 fail_on_write_number,
1079 partial_prefix_len,
1080 terminal_failure: false,
1081 sink: Vec::new(),
1082 }
1083 }
1084 }
1085
1086 impl Write for PartialThenFailWriter {
1087 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
1088 if self.terminal_failure {
1089 return Err(super::other_error("injected terminal write failure"));
1090 }
1091
1092 self.writes += 1;
1093 if self.writes == self.fail_on_write_number {
1094 let written = core::cmp::min(self.partial_prefix_len, buf.len());
1095 if written > 0 {
1096 self.sink.extend_from_slice(&buf[..written]);
1097 self.terminal_failure = true;
1098 return Ok(written);
1099 }
1100 return Err(super::other_error("injected terminal write failure"));
1101 }
1102
1103 self.sink.extend_from_slice(buf);
1104 Ok(buf.len())
1105 }
1106
1107 fn flush(&mut self) -> Result<(), Error> {
1108 Ok(())
1109 }
1110 }
1111
1112 #[test]
1116 fn streaming_encoder_set_magicless_before_write_omits_magic_and_roundtrips() {
1117 use crate::common::MAGIC_NUM;
1118 let payload = b"streaming-magicless-roundtrip-".repeat(64);
1119
1120 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1121 encoder
1122 .set_magicless(true)
1123 .expect("set_magicless pre-write");
1124 encoder.write_all(&payload).unwrap();
1125 let compressed = encoder.finish().unwrap();
1126
1127 assert!(
1128 !compressed.starts_with(&MAGIC_NUM.to_le_bytes()),
1129 "magicless frame must omit the 4-byte magic prefix",
1130 );
1131
1132 let mut decoder = crate::decoding::FrameDecoder::new();
1133 decoder.set_magicless(true);
1134 let mut cursor: &[u8] = compressed.as_slice();
1135 decoder.init(&mut cursor).expect("magicless init");
1136 decoder
1137 .decode_blocks(&mut cursor, crate::decoding::BlockDecodingStrategy::All)
1138 .expect("decode_blocks");
1139 let mut decoded: Vec<u8> = Vec::new();
1140 decoder
1141 .collect_to_writer(&mut decoded)
1142 .expect("collect_to_writer");
1143 assert_eq!(decoded, payload);
1144 }
1145
1146 #[test]
1151 fn streaming_encoder_set_magicless_after_first_write_errors() {
1152 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1153 encoder.write_all(b"first-block").unwrap();
1154 let err = encoder
1155 .set_magicless(true)
1156 .expect_err("set_magicless after first write must error");
1157 assert_eq!(
1158 err.kind(),
1159 crate::io::ErrorKind::InvalidInput,
1160 "expected InvalidInput when setting magicless after frame_started, got {err:?}",
1161 );
1162 }
1163
1164 #[test]
1165 fn streaming_encoder_roundtrip_multiple_writes() {
1166 let payload = b"streaming-encoder-roundtrip-".repeat(1024);
1167 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1168 for chunk in payload.chunks(313) {
1169 encoder.write_all(chunk).unwrap();
1170 }
1171 let compressed = encoder.finish().unwrap();
1172
1173 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1174 let mut decoded = Vec::new();
1175 decoder.read_to_end(&mut decoded).unwrap();
1176 assert_eq!(decoded, payload);
1177 }
1178
1179 #[test]
1180 fn flush_emits_nonempty_partial_output() {
1181 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1182 encoder.write_all(b"partial-block").unwrap();
1183 encoder.flush().unwrap();
1184 let flushed_len = encoder.get_ref().len();
1185 assert!(
1186 flushed_len > 0,
1187 "flush should emit header+partial block bytes"
1188 );
1189 let compressed = encoder.finish().unwrap();
1190 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1191 let mut decoded = Vec::new();
1192 decoder.read_to_end(&mut decoded).unwrap();
1193 assert_eq!(decoded, b"partial-block");
1194 }
1195
1196 #[test]
1197 fn flush_without_writes_does_not_emit_frame_header() {
1198 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1199 encoder.flush().unwrap();
1200 assert!(encoder.get_ref().is_empty());
1201 }
1202
1203 #[test]
1204 fn block_boundary_write_emits_block_in_same_call() {
1205 let mut boundary = StreamingEncoder::new_with_matcher(
1206 TinyMatcher::new(4),
1207 Vec::new(),
1208 CompressionLevel::Uncompressed,
1209 );
1210 let mut below = StreamingEncoder::new_with_matcher(
1211 TinyMatcher::new(4),
1212 Vec::new(),
1213 CompressionLevel::Uncompressed,
1214 );
1215
1216 boundary.write_all(b"ABCD").unwrap();
1217 below.write_all(b"ABC").unwrap();
1218
1219 let boundary_len = boundary.get_ref().len();
1220 let below_len = below.get_ref().len();
1221 assert!(
1222 boundary_len > below_len,
1223 "full block should be emitted immediately at block boundary"
1224 );
1225 }
1226
1227 #[test]
1228 fn finish_consumes_encoder_and_emits_frame() {
1229 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1230 encoder.write_all(b"abc").unwrap();
1231 let compressed = encoder.finish().unwrap();
1232 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1233 let mut decoded = Vec::new();
1234 decoder.read_to_end(&mut decoded).unwrap();
1235 assert_eq!(decoded, b"abc");
1236 }
1237
1238 #[test]
1239 fn finish_without_writes_emits_empty_frame() {
1240 let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1241 let compressed = encoder.finish().unwrap();
1242 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1243 let mut decoded = Vec::new();
1244 decoder.read_to_end(&mut decoded).unwrap();
1245 assert!(decoded.is_empty());
1246 }
1247
1248 #[test]
1249 fn write_empty_buffer_returns_zero() {
1250 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1251 assert_eq!(encoder.write(&[]).unwrap(), 0);
1252 let _ = encoder.finish().unwrap();
1253 }
1254
1255 #[test]
1256 fn uncompressed_level_roundtrip() {
1257 let payload = b"uncompressed-streaming-roundtrip".repeat(64);
1258 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
1259 for chunk in payload.chunks(41) {
1260 encoder.write_all(chunk).unwrap();
1261 }
1262 let compressed = encoder.finish().unwrap();
1263 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1264 let mut decoded = Vec::new();
1265 decoder.read_to_end(&mut decoded).unwrap();
1266 assert_eq!(decoded, payload);
1267 }
1268
1269 #[test]
1270 fn better_level_streaming_roundtrip() {
1271 let payload = b"better-level-streaming-test".repeat(256);
1272 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
1273 for chunk in payload.chunks(53) {
1274 encoder.write_all(chunk).unwrap();
1275 }
1276 let compressed = encoder.finish().unwrap();
1277 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1278 let mut decoded = Vec::new();
1279 decoder.read_to_end(&mut decoded).unwrap();
1280 assert_eq!(decoded, payload);
1281 }
1282
1283 #[test]
1284 fn zero_window_matcher_returns_invalid_input_error() {
1285 let mut encoder = StreamingEncoder::new_with_matcher(
1286 TinyMatcher::new(0),
1287 Vec::new(),
1288 CompressionLevel::Fastest,
1289 );
1290 let err = encoder.write_all(b"payload").unwrap_err();
1291 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1292 }
1293
1294 #[test]
1295 fn best_level_streaming_roundtrip() {
1296 let payload = b"best-level-streaming-test".repeat(8 * 1024);
1299 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
1300 for chunk in payload.chunks(53) {
1301 encoder.write_all(chunk).unwrap();
1302 }
1303 let compressed = encoder.finish().unwrap();
1304 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1305 let mut decoded = Vec::new();
1306 decoder.read_to_end(&mut decoded).unwrap();
1307 assert_eq!(decoded, payload);
1308 }
1309
1310 #[test]
1311 fn write_failure_poisoning_is_sticky() {
1312 let mut encoder = StreamingEncoder::new_with_matcher(
1313 TinyMatcher::new(4),
1314 FailingWriteOnce::new(1),
1315 CompressionLevel::Uncompressed,
1316 );
1317
1318 assert!(encoder.write_all(b"ABCD").is_err());
1319 assert!(encoder.flush().is_err());
1320 assert!(encoder.write_all(b"EFGH").is_err());
1321 assert_eq!(encoder.get_ref().sink.len(), 0);
1322 assert!(encoder.finish().is_err());
1323 }
1324
1325 #[test]
1326 fn poisoned_encoder_returns_original_error_kind() {
1327 let mut encoder = StreamingEncoder::new_with_matcher(
1328 TinyMatcher::new(4),
1329 FailingWithKind::new(1, ErrorKind::BrokenPipe),
1330 CompressionLevel::Uncompressed,
1331 );
1332
1333 let first_error = encoder.write_all(b"ABCD").unwrap_err();
1334 assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
1335
1336 let second_error = encoder.write_all(b"EFGH").unwrap_err();
1337 assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
1338 }
1339
1340 #[test]
1341 fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
1342 let payload = b"ABCDEFGHIJKL";
1343 let mut encoder = StreamingEncoder::new_with_matcher(
1344 TinyMatcher::new(4),
1345 FailingWriteOnce::new(3),
1346 CompressionLevel::Uncompressed,
1347 );
1348
1349 let first_write = encoder.write(payload).unwrap();
1350 assert_eq!(first_write, 8);
1351 assert!(encoder.write(&payload[first_write..]).is_err());
1352 assert!(encoder.flush().is_err());
1353 assert!(encoder.write_all(b"EFGH").is_err());
1354 }
1355
1356 #[test]
1357 fn partial_write_failure_after_progress_poisons_encoder() {
1358 let payload = b"ABCDEFGHIJKL";
1359 let mut encoder = StreamingEncoder::new_with_matcher(
1360 TinyMatcher::new(4),
1361 PartialThenFailWriter::new(3, 1),
1362 CompressionLevel::Uncompressed,
1363 );
1364
1365 let first_write = encoder.write(payload).unwrap();
1366 assert_eq!(first_write, 8);
1367
1368 let second_write = encoder.write(&payload[first_write..]);
1369 assert!(second_write.is_err());
1370 assert!(encoder.flush().is_err());
1371 assert!(encoder.write_all(b"MNOP").is_err());
1372 }
1373
1374 #[test]
1375 fn new_with_matcher_and_get_mut_work() {
1376 let matcher = TinyMatcher::new(128 * 1024);
1377 let mut encoder =
1378 StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
1379 encoder.get_mut().extend_from_slice(b"");
1380 encoder.write_all(b"custom-matcher").unwrap();
1381 let compressed = encoder.finish().unwrap();
1382 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1383 let mut decoded = Vec::new();
1384 decoder.read_to_end(&mut decoded).unwrap();
1385 assert_eq!(decoded, b"custom-matcher");
1386 }
1387
1388 #[test]
1389 fn pledged_content_size_written_in_header() {
1390 let payload = b"hello world, pledged size test";
1391 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1392 encoder
1393 .set_pledged_content_size(payload.len() as u64)
1394 .unwrap();
1395 encoder.write_all(payload).unwrap();
1396 let compressed = encoder.finish().unwrap();
1397
1398 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1400 .unwrap()
1401 .0;
1402 assert_eq!(header.frame_content_size(), payload.len() as u64);
1403
1404 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1406 let mut decoded = Vec::new();
1407 decoder.read_to_end(&mut decoded).unwrap();
1408 assert_eq!(decoded, payload);
1409 }
1410
1411 #[test]
1412 fn pledged_content_size_mismatch_returns_error() {
1413 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1414 encoder.set_pledged_content_size(100).unwrap();
1415 encoder.write_all(b"short payload").unwrap(); let err = encoder.finish().unwrap_err();
1417 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1418 }
1419
1420 #[test]
1421 fn write_exceeding_pledge_returns_error() {
1422 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1423 encoder.set_pledged_content_size(5).unwrap();
1424 let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
1425 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1426 }
1427
1428 #[test]
1429 fn write_straddling_pledge_reports_partial_progress() {
1430 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1431 encoder.set_pledged_content_size(5).unwrap();
1432 assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
1434 let err = encoder.write(b"g").unwrap_err();
1436 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1437 }
1438
1439 #[test]
1440 fn encoded_scratch_capacity_is_reused_across_blocks() {
1441 let payload = vec![0xAB; 64 * 3];
1442 let mut encoder = StreamingEncoder::new_with_matcher(
1443 TinyMatcher::new(64),
1444 Vec::new(),
1445 CompressionLevel::Uncompressed,
1446 );
1447
1448 encoder.write_all(&payload[..64]).unwrap();
1449 let first_capacity = encoder.encoded_scratch.capacity();
1450 assert!(
1451 first_capacity >= 67,
1452 "expected encoded scratch to keep block header + payload capacity",
1453 );
1454
1455 encoder.write_all(&payload[64..128]).unwrap();
1456 let second_capacity = encoder.encoded_scratch.capacity();
1457 assert!(
1458 second_capacity >= first_capacity,
1459 "encoded scratch capacity should be reused across block emits",
1460 );
1461
1462 encoder.write_all(&payload[128..]).unwrap();
1463 let compressed = encoder.finish().unwrap();
1464 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1465 let mut decoded = Vec::new();
1466 decoder.read_to_end(&mut decoded).unwrap();
1467 assert_eq!(decoded, payload);
1468 }
1469
1470 #[test]
1471 fn pledged_content_size_after_write_returns_error() {
1472 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1473 encoder.write_all(b"already writing").unwrap();
1474 let err = encoder.set_pledged_content_size(15).unwrap_err();
1475 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1476 }
1477
1478 #[test]
1479 fn source_size_hint_directly_reduces_window_header() {
1480 let payload = b"streaming-source-size-hint".repeat(64);
1481
1482 let mut no_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1483 no_hint.write_all(payload.as_slice()).unwrap();
1484 let no_hint_frame = no_hint.finish().unwrap();
1485 let no_hint_header = crate::decoding::frame::read_frame_header(no_hint_frame.as_slice())
1486 .unwrap()
1487 .0;
1488 let no_hint_window = no_hint_header.window_size().unwrap();
1489
1490 let mut with_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1491 with_hint
1492 .set_source_size_hint(payload.len() as u64)
1493 .unwrap();
1494 with_hint.write_all(payload.as_slice()).unwrap();
1495 let late_hint_err = with_hint
1496 .set_source_size_hint(payload.len() as u64)
1497 .unwrap_err();
1498 assert_eq!(late_hint_err.kind(), ErrorKind::InvalidInput);
1499 let with_hint_frame = with_hint.finish().unwrap();
1500 let with_hint_header =
1501 crate::decoding::frame::read_frame_header(with_hint_frame.as_slice())
1502 .unwrap()
1503 .0;
1504 let with_hint_window = with_hint_header.window_size().unwrap();
1505
1506 assert!(
1507 with_hint_window <= no_hint_window,
1508 "source size hint should not increase advertised window"
1509 );
1510
1511 let mut decoder = StreamingDecoder::new(with_hint_frame.as_slice()).unwrap();
1512 let mut decoded = Vec::new();
1513 decoder.read_to_end(&mut decoded).unwrap();
1514 assert_eq!(decoded, payload);
1515 }
1516
1517 #[test]
1518 fn single_segment_requires_pledged_to_fit_matcher_window() {
1519 let payload = b"streaming-window-gate-".repeat(60); let mut encoder = StreamingEncoder::new_with_matcher(
1521 TinyMatcher::new(1024),
1522 Vec::new(),
1523 CompressionLevel::Fastest,
1524 );
1525 encoder
1526 .set_pledged_content_size(payload.len() as u64)
1527 .unwrap();
1528 encoder.write_all(payload.as_slice()).unwrap();
1529 let compressed = encoder.finish().unwrap();
1530
1531 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1532 .unwrap()
1533 .0;
1534 assert_eq!(header.frame_content_size(), payload.len() as u64);
1535 assert!(
1536 !header.descriptor.single_segment_flag(),
1537 "single-segment must stay off when pledged content size exceeds matcher window"
1538 );
1539 assert!(
1540 header.window_size().unwrap() >= 1024,
1541 "window descriptor should be present when single-segment is disabled"
1542 );
1543 }
1544
1545 #[test]
1546 fn ensure_frame_started_refreshes_stale_strategy_tag_at_reset() {
1547 use crate::encoding::strategy::StrategyTag;
1563 for level in [
1564 CompressionLevel::Fastest,
1565 CompressionLevel::Default,
1566 CompressionLevel::Better,
1567 CompressionLevel::Best,
1568 ] {
1569 let expected = StrategyTag::for_compression_level(level);
1570 let mut encoder = StreamingEncoder::new(Vec::new(), level);
1571 let sentinel = StrategyTag::BtUltra2;
1576 assert_ne!(
1577 expected, sentinel,
1578 "sentinel must differ from the legitimate tag at level {level:?}",
1579 );
1580 encoder.state.strategy_tag = sentinel;
1581 encoder.write_all(b"x").unwrap();
1582 assert_eq!(
1583 encoder.state.strategy_tag, expected,
1584 "reset-time strategy_tag sync missing at level {level:?}: \
1585 sentinel survived `ensure_frame_started`",
1586 );
1587 let _ = encoder.finish().unwrap();
1588 }
1589 }
1590
1591 #[test]
1600 fn level_22_streaming_window_roundtrips_in_our_decoder() {
1601 let payload = b"level-22-streaming-window-cap-".repeat(512);
1602 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(22));
1603 for chunk in payload.chunks(101) {
1604 encoder.write_all(chunk).unwrap();
1605 }
1606 let compressed = encoder.finish().unwrap();
1607
1608 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1611 .unwrap()
1612 .0;
1613 let window = header.window_size().unwrap();
1614 assert!(
1615 window <= crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE,
1616 "L22 advertised window {window} exceeds decoder cap {}",
1617 crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE,
1618 );
1619
1620 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1621 let mut decoded = Vec::new();
1622 decoder.read_to_end(&mut decoded).unwrap();
1623 assert_eq!(decoded, payload);
1624 }
1625
1626 #[test]
1630 fn streaming_encoder_set_content_checksum_false_clears_header_flag() {
1631 let payload = b"streaming-checksum-toggle-".repeat(64);
1632 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1633 encoder
1634 .set_content_checksum(false)
1635 .expect("set_content_checksum pre-write");
1636 encoder.write_all(&payload).unwrap();
1637 let compressed = encoder.finish().unwrap();
1638
1639 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1640 .unwrap()
1641 .0;
1642 assert!(
1643 !header.descriptor.content_checksum_flag(),
1644 "content_checksum(false) must clear the frame header flag",
1645 );
1646
1647 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1648 let mut decoded = Vec::new();
1649 decoder.read_to_end(&mut decoded).unwrap();
1650 assert_eq!(decoded, payload);
1651 }
1652
1653 #[cfg(feature = "hash")]
1657 #[test]
1658 fn streaming_encoder_set_content_checksum_false_omits_trailer() {
1659 let payload = b"streaming-checksum-trailer-".repeat(64);
1660
1661 let mut with = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1662 with.set_content_checksum(true)
1664 .expect("set_content_checksum pre-write");
1665 with.write_all(&payload).unwrap();
1666 let with_checksum = with.finish().unwrap();
1667
1668 let mut without = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1669 without
1670 .set_content_checksum(false)
1671 .expect("set_content_checksum pre-write");
1672 without.write_all(&payload).unwrap();
1673 let without_checksum = without.finish().unwrap();
1674
1675 assert!(
1676 crate::decoding::frame::read_frame_header(with_checksum.as_slice())
1677 .unwrap()
1678 .0
1679 .descriptor
1680 .content_checksum_flag(),
1681 "default checksum-on frame must set the header flag",
1682 );
1683 assert_eq!(
1684 with_checksum.len(),
1685 without_checksum.len() + 4,
1686 "checksum-on frame must carry exactly the 4-byte XXH64 trailer",
1687 );
1688 }
1689
1690 #[test]
1695 fn streaming_encoder_set_content_checksum_after_first_write_errors() {
1696 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1697 encoder.write_all(b"first-block").unwrap();
1698 let err = encoder
1699 .set_content_checksum(false)
1700 .expect_err("set_content_checksum after first write must error");
1701 assert_eq!(
1702 err.kind(),
1703 ErrorKind::InvalidInput,
1704 "expected InvalidInput when setting content checksum after frame_started, got {err:?}",
1705 );
1706 }
1707
1708 #[test]
1709 fn no_pledged_size_omits_fcs_from_header() {
1710 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1711 encoder.write_all(b"no pledged size").unwrap();
1712 let compressed = encoder.finish().unwrap();
1713
1714 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1716 .unwrap()
1717 .0;
1718 assert_eq!(header.frame_content_size(), 0);
1719 assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1722 }
1723
1724 #[test]
1725 fn streaming_encoder_with_dictionary_roundtrips_and_carries_dict_id() {
1726 use alloc::format;
1727 let dict_raw = include_bytes!("../../dict_tests/dictionary");
1728 let dict_id = crate::decoding::Dictionary::decode_dict(dict_raw)
1729 .unwrap()
1730 .id;
1731
1732 let mut payload = Vec::new();
1736 for i in 0..400u32 {
1737 payload.extend_from_slice(
1738 format!("tenant=demo table=orders key={i} region=eu payload=aaaaabbbbbccccc\n")
1739 .as_bytes(),
1740 );
1741 }
1742
1743 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Level(19));
1744 encoder
1745 .set_dictionary_from_bytes(dict_raw)
1746 .expect("attach dictionary");
1747 for chunk in payload.chunks(777) {
1748 encoder.write_all(chunk).unwrap();
1749 }
1750 let compressed = encoder.finish().unwrap();
1751
1752 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1755 .unwrap()
1756 .0;
1757 assert_eq!(header.dictionary_id(), Some(dict_id));
1758
1759 let mut decoder =
1761 StreamingDecoder::new_with_dictionary_bytes(compressed.as_slice(), dict_raw).unwrap();
1762 let mut decoded = Vec::new();
1763 decoder.read_to_end(&mut decoded).unwrap();
1764 assert_eq!(decoded, payload);
1765
1766 let mut nodict = StreamingEncoder::new(Vec::new(), CompressionLevel::Level(19));
1770 for chunk in payload.chunks(777) {
1771 nodict.write_all(chunk).unwrap();
1772 }
1773 let nodict_frame = nodict.finish().unwrap();
1774 assert!(
1775 compressed.len() <= nodict_frame.len(),
1776 "dict frame {} should not exceed no-dict frame {}",
1777 compressed.len(),
1778 nodict_frame.len()
1779 );
1780 }
1781
1782 #[test]
1783 fn streaming_encoder_strategy_override_survives_frame_start() {
1784 use crate::encoding::{CompressionParameters, Strategy};
1790 let level = CompressionLevel::Fastest;
1791 let level_tag = crate::encoding::strategy::StrategyTag::for_compression_level(level);
1792 let override_tag = Strategy::Greedy.tag();
1793 assert_ne!(
1794 level_tag, override_tag,
1795 "test needs an override that changes the derived tag"
1796 );
1797
1798 let params = CompressionParameters::builder(level)
1799 .strategy(Strategy::Greedy)
1800 .build()
1801 .unwrap();
1802 let payload = b"override must outlive the frame header";
1803 let mut encoder = StreamingEncoder::new(Vec::new(), level);
1804 encoder.set_parameters(¶ms).unwrap();
1805 encoder.write_all(payload).unwrap();
1806 assert_eq!(
1807 encoder.state.strategy_tag, override_tag,
1808 "strategy override was discarded when the frame started"
1809 );
1810
1811 let compressed = encoder.finish().unwrap();
1812 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1813 let mut decoded = Vec::new();
1814 decoder.read_to_end(&mut decoded).unwrap();
1815 assert_eq!(decoded, payload);
1816 }
1817
1818 #[test]
1819 fn streaming_encoder_uncompressed_with_dictionary_omits_dict_id() {
1820 let dict_raw = include_bytes!("../../dict_tests/dictionary");
1826 let payload = b"tenant=demo table=orders region=eu payload=aaaaabbbbbccccc";
1827 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
1828 encoder
1829 .set_dictionary_from_bytes(dict_raw)
1830 .expect("attach dictionary");
1831 encoder.write_all(payload).unwrap();
1832 let compressed = encoder.finish().unwrap();
1833
1834 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1835 .unwrap()
1836 .0;
1837 assert_eq!(
1838 header.dictionary_id(),
1839 None,
1840 "uncompressed frame must not require a dictionary at decode time"
1841 );
1842
1843 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1845 let mut decoded = Vec::new();
1846 decoder.read_to_end(&mut decoded).unwrap();
1847 assert_eq!(decoded, payload);
1848 }
1849}