1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14 CompressionLevel, EncoderDictionary, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15 frame_compressor::CachedDictionaryEntropy, frame_compressor::CompressState,
16 frame_compressor::FseTables, frame_compressor::PreviousFseTable, frame_header::FrameHeader,
17};
18use crate::io::{Error, ErrorKind, Write};
19
20pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
26 drain: Option<W>,
27 compression_level: CompressionLevel,
28 state: CompressState<M>,
29 pending: Vec<u8>,
30 encoded_scratch: Vec<u8>,
31 errored: bool,
32 last_error_kind: Option<ErrorKind>,
33 last_error_message: Option<String>,
34 frame_started: bool,
35 target_block_size: Option<u32>,
39 pledged_content_size: Option<u64>,
40 source_size_hint: Option<u64>,
46 content_size_flag: bool,
52 bytes_consumed: u64,
53 strategy_override: Option<crate::encoding::strategy::StrategyTag>,
59 magicless: bool,
62 content_checksum: bool,
68 dictionary: Option<EncoderDictionary>,
71 dictionary_id_flag: bool,
77 dictionary_entropy_cache: Option<CachedDictionaryEntropy>,
81 #[cfg(feature = "hash")]
82 hasher: XxHash64,
83}
84
85impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
86 pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
91 Self::new_with_matcher(
92 MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
93 drain,
94 compression_level,
95 )
96 }
97
98 pub fn set_parameters(
106 &mut self,
107 params: &crate::encoding::CompressionParameters,
108 ) -> Result<(), Error> {
109 self.ensure_open()?;
110 if self.frame_started {
111 return Err(invalid_input_error(
112 "compression parameters must be set before the first write",
113 ));
114 }
115 self.compression_level = params.level();
116 let overrides = params.overrides();
117 self.strategy_override = overrides.strategy.map(|s| s.tag());
120 self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
121 crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
122 });
123 self.state.huf_optimal_search = crate::encoding::frame_compressor::fast_huf_search_enabled(
124 self.state.strategy_tag,
125 self.pledged_content_size.or(self.source_size_hint),
126 );
127 self.state.matcher.set_param_overrides(Some(overrides));
128 Ok(())
129 }
130}
131
132impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
133 pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
138 Self {
139 drain: Some(drain),
140 compression_level,
141 state: CompressState {
142 matcher,
143 last_huff_table: None,
144 huff_table_spare: None,
145 fse_tables: FseTables::new(),
146 block_scratch: crate::encoding::blocks::CompressedBlockScratch::new(),
147 offset_hist: [1, 4, 8],
148 strategy_tag: crate::encoding::strategy::StrategyTag::for_compression_level(
149 compression_level,
150 ),
151 huf_optimal_search: true,
152 },
153 pending: Vec::new(),
154 encoded_scratch: Vec::new(),
155 errored: false,
156 last_error_kind: None,
157 last_error_message: None,
158 frame_started: false,
159 target_block_size: None,
160 pledged_content_size: None,
161 source_size_hint: None,
162 content_size_flag: true,
163 bytes_consumed: 0,
164 strategy_override: None,
165 magicless: false,
166 content_checksum: false,
167 dictionary: None,
168 dictionary_id_flag: true,
169 dictionary_entropy_cache: None,
170 #[cfg(feature = "hash")]
171 hasher: XxHash64::with_seed(0),
172 }
173 }
174
175 pub fn set_target_block_size(&mut self, target: Option<u32>) -> Result<(), Error> {
184 self.ensure_open()?;
185 if self.frame_started {
186 return Err(invalid_input_error(
187 "the block-size target must be set before the first write",
188 ));
189 }
190 self.target_block_size = target.map(|t| {
191 t.clamp(
192 crate::common::MIN_TARGET_BLOCK_SIZE,
193 crate::common::MAX_BLOCK_SIZE,
194 )
195 });
196 Ok(())
197 }
198
199 pub fn set_content_checksum(&mut self, emit: bool) -> Result<(), Error> {
207 self.ensure_open()?;
208 if self.frame_started {
209 return Err(invalid_input_error(
210 "content checksum must be set before the first write",
211 ));
212 }
213 self.content_checksum = emit;
214 Ok(())
215 }
216
217 pub fn set_magicless(&mut self, magicless: bool) -> Result<(), Error> {
225 self.ensure_open()?;
226 if self.frame_started {
227 return Err(invalid_input_error(
228 "magicless format must be set before the first write",
229 ));
230 }
231 self.magicless = magicless;
232 Ok(())
233 }
234
235 pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
246 self.ensure_open()?;
247 if self.frame_started {
248 return Err(invalid_input_error(
249 "pledged content size must be set before the first write",
250 ));
251 }
252 self.pledged_content_size = Some(size);
253 self.state.matcher.set_source_size_hint(size);
256 Ok(())
257 }
258
259 pub fn set_content_size_flag(&mut self, emit: bool) -> Result<(), Error> {
266 self.ensure_open()?;
267 if self.frame_started {
268 return Err(invalid_input_error(
269 "content size flag must be set before the first write",
270 ));
271 }
272 self.content_size_flag = emit;
273 Ok(())
274 }
275
276 pub fn set_source_size_hint(&mut self, size: u64) -> Result<(), Error> {
284 self.ensure_open()?;
285 if self.frame_started {
286 return Err(invalid_input_error(
287 "source size hint must be set before the first write",
288 ));
289 }
290 self.state.matcher.set_source_size_hint(size);
291 self.source_size_hint = Some(size);
296 Ok(())
297 }
298
299 pub fn set_dictionary_from_bytes(&mut self, raw_dictionary: &[u8]) -> Result<(), Error> {
306 let dict = EncoderDictionary::from_bytes(raw_dictionary)
307 .map_err(|err| invalid_input_error(&alloc::format!("invalid dictionary: {err:?}")))?;
308 self.set_encoder_dictionary(dict)
309 }
310
311 pub fn set_dictionary_id_flag(&mut self, emit: bool) -> Result<(), Error> {
316 self.ensure_open()?;
317 if self.frame_started {
318 return Err(invalid_input_error(
319 "dictionary ID flag must be set before the first write",
320 ));
321 }
322 self.dictionary_id_flag = emit;
323 Ok(())
324 }
325
326 pub fn set_encoder_dictionary(&mut self, dict: EncoderDictionary) -> Result<(), Error> {
330 self.ensure_open()?;
331 if self.frame_started {
332 return Err(invalid_input_error(
333 "dictionary must be attached before the first write",
334 ));
335 }
336 let inner = &dict.inner;
337 if inner.id == 0 {
338 return Err(invalid_input_error("dictionary has a zero ID"));
339 }
340 if inner.offset_hist.contains(&0) {
341 return Err(invalid_input_error(
342 "dictionary carries a zero repeat offset",
343 ));
344 }
345 self.dictionary_entropy_cache = Some(CachedDictionaryEntropy::from_dictionary(inner));
346 self.dictionary = Some(dict);
347 Ok(())
348 }
349
350 pub fn get_ref(&self) -> &W {
355 self.drain
356 .as_ref()
357 .expect("streaming encoder drain is present until finish consumes self")
358 }
359
360 pub fn heap_size(&self) -> usize {
369 let mut total = self.state.matcher.heap_size();
370 total += self
371 .state
372 .last_huff_table
373 .as_ref()
374 .map_or(0, |table| table.heap_size());
375 total += self
376 .state
377 .huff_table_spare
378 .as_ref()
379 .map_or(0, |table| table.heap_size());
380 total += self.pending.capacity();
381 total += self.encoded_scratch.capacity();
382 total += self
383 .dictionary
384 .as_ref()
385 .map_or(0, |d| d.inner.dict_content.capacity());
386 total += self
387 .dictionary_entropy_cache
388 .as_ref()
389 .map_or(0, CachedDictionaryEntropy::heap_size);
390 total
391 }
392
393 pub fn get_mut(&mut self) -> &mut W {
401 self.drain
402 .as_mut()
403 .expect("streaming encoder drain is present until finish consumes self")
404 }
405
406 pub fn finish(mut self) -> Result<W, Error> {
411 self.ensure_open()?;
412
413 if let Some(pledged) = self.pledged_content_size
417 && self.bytes_consumed != pledged
418 {
419 return Err(invalid_input_error(
420 "pledged content size does not match bytes consumed",
421 ));
422 }
423
424 self.ensure_frame_started()?;
425
426 if self.pending.is_empty() {
427 self.write_empty_last_block()
428 .map_err(|err| self.fail(err))?;
429 } else {
430 self.emit_pending_block(true)?;
431 }
432
433 let mut drain = self
434 .drain
435 .take()
436 .expect("streaming encoder drain must be present when finishing");
437
438 #[cfg(feature = "hash")]
439 if self.content_checksum {
440 let checksum = self.hasher.finish() as u32;
441 drain
442 .write_all(&checksum.to_le_bytes())
443 .map_err(|err| self.fail(err))?;
444 }
445
446 drain.flush().map_err(|err| self.fail(err))?;
447 Ok(drain)
448 }
449
450 fn ensure_open(&self) -> Result<(), Error> {
451 if self.errored {
452 return Err(self.sticky_error());
453 }
454 Ok(())
455 }
456
457 fn sticky_error(&self) -> Error {
461 match (self.last_error_kind, self.last_error_message.as_deref()) {
462 (Some(kind), Some(message)) => error_with_kind_message(
463 kind,
464 format!(
465 "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
466 ),
467 ),
468 (Some(kind), None) => error_from_kind(kind),
469 (None, Some(message)) => other_error_owned(format!(
470 "streaming encoder is in an errored state: {message}"
471 )),
472 (None, None) => other_error("streaming encoder is in an errored state"),
473 }
474 }
475
476 fn drain_mut(&mut self) -> Result<&mut W, Error> {
477 self.drain
478 .as_mut()
479 .ok_or_else(|| other_error("streaming encoder has no active drain"))
480 }
481
482 fn ensure_frame_started(&mut self) -> Result<(), Error> {
483 if self.frame_started {
484 return Ok(());
485 }
486
487 self.ensure_level_supported()?;
488 let use_dictionary_state =
496 !matches!(self.compression_level, CompressionLevel::Uncompressed)
497 && self.state.matcher.supports_dictionary_priming()
498 && self.dictionary.is_some();
499 if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
502 self.state
503 .matcher
504 .set_dictionary_size_hint(dict.inner.dict_content.len());
505 }
506 self.state.matcher.reset(self.compression_level);
507 self.state.offset_hist = if use_dictionary_state {
510 self.dictionary
511 .as_ref()
512 .map(|dict| dict.inner.offset_hist)
513 .unwrap_or([1, 4, 8])
514 } else {
515 [1, 4, 8]
516 };
517 if use_dictionary_state && let Some(dict) = self.dictionary.as_ref() {
522 let offset_hist = dict.inner.offset_hist;
523 self.state
524 .matcher
525 .prime_with_dictionary(dict.inner.dict_content.as_slice(), offset_hist);
526 }
527 if use_dictionary_state && let Some(cache) = self.dictionary_entropy_cache.as_ref() {
530 self.state.last_huff_table.clone_from(&cache.huff);
531 self.state
532 .fse_tables
533 .ll_previous
534 .clone_from(&cache.ll_previous);
535 self.state
536 .fse_tables
537 .ml_previous
538 .clone_from(&cache.ml_previous);
539 self.state
540 .fse_tables
541 .of_previous
542 .clone_from(&cache.of_previous);
543 let ll_entropy = match cache.ll_previous.as_ref() {
544 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
545 _ => None,
546 };
547 let ml_entropy = match cache.ml_previous.as_ref() {
548 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
549 _ => None,
550 };
551 let of_entropy = match cache.of_previous.as_ref() {
552 Some(PreviousFseTable::Custom(table)) => Some(table.as_ref()),
553 _ => None,
554 };
555 self.state.matcher.seed_dictionary_entropy(
556 self.state.last_huff_table.as_ref(),
557 ll_entropy,
558 ml_entropy,
559 of_entropy,
560 );
561 } else {
562 self.state.last_huff_table = None;
563 self.state.fse_tables.ll_previous = None;
564 self.state.fse_tables.ml_previous = None;
565 self.state.fse_tables.of_previous = None;
566 }
567 self.state.strategy_tag = self.strategy_override.unwrap_or_else(|| {
575 crate::encoding::strategy::StrategyTag::for_compression_level(self.compression_level)
576 });
577 self.state.huf_optimal_search = crate::encoding::frame_compressor::fast_huf_search_enabled(
578 self.state.strategy_tag,
579 self.pledged_content_size.or(self.source_size_hint),
580 );
581 #[cfg(feature = "hash")]
582 {
583 self.hasher = XxHash64::with_seed(0);
584 }
585
586 let window_size = self.state.matcher.window_size();
587 if window_size == 0 {
588 return Err(invalid_input_error(
589 "matcher reported window_size == 0, which is invalid",
590 ));
591 }
592
593 let single_segment = self.content_size_flag
602 && !use_dictionary_state
603 && self
604 .pledged_content_size
605 .map(|size| (512..=(1 << 14)).contains(&size) && size <= window_size)
606 .unwrap_or(false);
607
608 let header = FrameHeader {
609 frame_content_size: if self.content_size_flag {
610 self.pledged_content_size
611 } else {
612 None
613 },
614 single_segment,
615 content_checksum: cfg!(feature = "hash") && self.content_checksum,
616 dictionary_id: if use_dictionary_state && self.dictionary_id_flag {
617 self.dictionary.as_ref().map(|dict| dict.inner.id as u64)
618 } else {
619 None
620 },
621 window_size: if single_segment {
622 None
623 } else {
624 Some(window_size)
625 },
626 magicless: self.magicless,
627 };
628 let mut encoded_header = Vec::new();
629 header.serialize(&mut encoded_header);
630 self.drain_mut()
631 .and_then(|drain| drain.write_all(&encoded_header))
632 .map_err(|err| self.fail(err))?;
633
634 self.frame_started = true;
635 Ok(())
636 }
637
638 fn block_capacity(&self) -> usize {
639 let matcher_window = self.state.matcher.window_size() as usize;
640 let ceiling = self
641 .target_block_size
642 .map_or(MAX_BLOCK_SIZE as usize, |t| t as usize);
643 core::cmp::max(1, core::cmp::min(matcher_window, ceiling))
644 }
645
646 fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
647 let mut space = match self.compression_level {
648 CompressionLevel::Fastest
649 | CompressionLevel::Default
650 | CompressionLevel::Better
651 | CompressionLevel::Best
652 | CompressionLevel::Level(_) => self.state.matcher.get_next_space(),
653 CompressionLevel::Uncompressed => Vec::new(),
654 };
655 space.clear();
656 if space.capacity() > block_capacity {
657 space.shrink_to(block_capacity);
658 }
659 if space.capacity() < block_capacity {
660 space.reserve(block_capacity - space.capacity());
661 }
662 space
663 }
664
665 fn emit_full_pending_block(
666 &mut self,
667 block_capacity: usize,
668 consumed: usize,
669 ) -> Option<Result<usize, Error>> {
670 if self.pending.len() != block_capacity {
671 return None;
672 }
673
674 let new_pending = self.allocate_pending_space(block_capacity);
675 let full_block = mem::replace(&mut self.pending, new_pending);
676 if let Err((err, restored_block)) = self.encode_block(full_block, false) {
677 self.pending = restored_block;
678 let err = self.fail(err);
679 if consumed > 0 {
680 return Some(Ok(consumed));
681 }
682 return Some(Err(err));
683 }
684 None
685 }
686
687 fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
688 let block = mem::take(&mut self.pending);
689 if let Err((err, restored_block)) = self.encode_block(block, last_block) {
690 self.pending = restored_block;
691 return Err(self.fail(err));
692 }
693 if !last_block {
694 let block_capacity = self.block_capacity();
695 self.pending = self.allocate_pending_space(block_capacity);
696 }
697 Ok(())
698 }
699
700 fn ensure_level_supported(&self) -> Result<(), Error> {
704 match self.compression_level {
705 CompressionLevel::Uncompressed
706 | CompressionLevel::Fastest
707 | CompressionLevel::Default
708 | CompressionLevel::Better
709 | CompressionLevel::Best
710 | CompressionLevel::Level(_) => Ok(()),
711 }
712 }
713
714 fn encode_block(
715 &mut self,
716 uncompressed_data: Vec<u8>,
717 last_block: bool,
718 ) -> Result<(), (Error, Vec<u8>)> {
719 let mut raw_block = Some(uncompressed_data);
720 let mut encoded = Vec::new();
721 mem::swap(&mut encoded, &mut self.encoded_scratch);
722 encoded.clear();
723 let needed_capacity = self.block_capacity() + 3;
724 if encoded.capacity() < needed_capacity {
725 encoded.reserve(needed_capacity.saturating_sub(encoded.len()));
726 }
727 let mut moved_into_matcher = false;
728 if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
729 let header = BlockHeader {
730 last_block,
731 block_type: crate::blocks::block::BlockType::Raw,
732 block_size: 0,
733 };
734 header.serialize(&mut encoded);
735 } else {
736 match self.compression_level {
737 CompressionLevel::Uncompressed => {
738 let block = raw_block.as_ref().expect("raw block missing");
739 let header = BlockHeader {
740 last_block,
741 block_type: crate::blocks::block::BlockType::Raw,
742 block_size: block.len() as u32,
743 };
744 header.serialize(&mut encoded);
745 encoded.extend_from_slice(block);
746 }
747 CompressionLevel::Fastest
748 | CompressionLevel::Default
749 | CompressionLevel::Better
750 | CompressionLevel::Best
751 | CompressionLevel::Level(_) => {
752 let block = raw_block.take().expect("raw block missing");
753 debug_assert!(!block.is_empty(), "empty blocks handled above");
754 let dict_active = self.dictionary.is_some()
761 && self.state.matcher.supports_dictionary_priming();
762 compress_block_encoded(
763 &mut self.state,
764 self.compression_level,
765 last_block,
766 block,
767 &mut encoded,
768 dict_active,
769 #[cfg(feature = "lsm")]
772 None,
773 #[cfg(all(feature = "lsm", feature = "hash"))]
774 None,
775 );
776 moved_into_matcher = true;
777 }
778 }
779 }
780
781 if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
782 encoded.clear();
783 mem::swap(&mut encoded, &mut self.encoded_scratch);
784 let restored = if moved_into_matcher {
785 self.state.matcher.get_last_space().to_vec()
786 } else {
787 raw_block.unwrap_or_default()
788 };
789 return Err((err, restored));
790 }
791
792 if moved_into_matcher {
793 #[cfg(feature = "hash")]
794 if self.content_checksum {
795 self.hasher.write(self.state.matcher.get_last_space());
796 }
797 } else {
798 self.hash_block(raw_block.as_deref().unwrap_or(&[]));
799 }
800 encoded.clear();
801 mem::swap(&mut encoded, &mut self.encoded_scratch);
802 Ok(())
803 }
804
805 fn write_empty_last_block(&mut self) -> Result<(), Error> {
806 self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
807 }
808
809 fn fail(&mut self, err: Error) -> Error {
810 self.errored = true;
811 if self.last_error_kind.is_none() {
812 self.last_error_kind = Some(err.kind());
813 }
814 if self.last_error_message.is_none() {
815 self.last_error_message = Some(err.to_string());
816 }
817 err
818 }
819
820 #[cfg(feature = "hash")]
821 fn hash_block(&mut self, uncompressed_data: &[u8]) {
822 if self.content_checksum {
823 self.hasher.write(uncompressed_data);
824 }
825 }
826
827 #[cfg(not(feature = "hash"))]
828 fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
829}
830
831impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
832 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
833 self.ensure_open()?;
834 if buf.is_empty() {
835 return Ok(0);
836 }
837
838 if let Some(pledged) = self.pledged_content_size
842 && self.bytes_consumed >= pledged
843 {
844 return Err(invalid_input_error(
845 "write would exceed pledged content size",
846 ));
847 }
848
849 self.ensure_frame_started()?;
850
851 let buf = if let Some(pledged) = self.pledged_content_size {
855 let remaining_allowed = pledged
856 .checked_sub(self.bytes_consumed)
857 .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
858 if remaining_allowed == 0 {
859 return Err(invalid_input_error(
860 "write would exceed pledged content size",
861 ));
862 }
863 let accepted = core::cmp::min(
864 buf.len(),
865 usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
866 );
867 &buf[..accepted]
868 } else {
869 buf
870 };
871
872 let block_capacity = self.block_capacity();
873 if self.pending.capacity() == 0 {
874 self.pending = self.allocate_pending_space(block_capacity);
875 }
876 let mut remaining = buf;
877 let mut consumed = 0usize;
878
879 while !remaining.is_empty() {
880 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
881 return result;
882 }
883
884 let available = block_capacity - self.pending.len();
885 let to_take = core::cmp::min(remaining.len(), available);
886 if to_take == 0 {
887 break;
888 }
889 self.pending.extend_from_slice(&remaining[..to_take]);
890 remaining = &remaining[to_take..];
891 consumed += to_take;
892
893 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
894 if let Ok(n) = &result {
895 self.bytes_consumed += *n as u64;
896 }
897 return result;
898 }
899 }
900 self.bytes_consumed += consumed as u64;
901 Ok(consumed)
902 }
903
904 fn flush(&mut self) -> Result<(), Error> {
905 self.ensure_open()?;
906 if self.pending.is_empty() {
907 return self
908 .drain_mut()
909 .and_then(|drain| drain.flush())
910 .map_err(|err| self.fail(err));
911 }
912 self.ensure_frame_started()?;
913 self.emit_pending_block(false)?;
914 self.drain_mut()
915 .and_then(|drain| drain.flush())
916 .map_err(|err| self.fail(err))
917 }
918}
919
920fn error_from_kind(kind: ErrorKind) -> Error {
921 Error::from(kind)
922}
923
924fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
925 #[cfg(feature = "std")]
926 {
927 Error::new(kind, message)
928 }
929 #[cfg(not(feature = "std"))]
930 {
931 Error::new(kind, alloc::boxed::Box::new(message))
932 }
933}
934
935fn invalid_input_error(message: &str) -> Error {
936 #[cfg(feature = "std")]
937 {
938 Error::new(ErrorKind::InvalidInput, message)
939 }
940 #[cfg(not(feature = "std"))]
941 {
942 Error::new(
943 ErrorKind::Other,
944 alloc::boxed::Box::new(alloc::string::String::from(message)),
945 )
946 }
947}
948
949fn other_error_owned(message: String) -> Error {
950 #[cfg(feature = "std")]
951 {
952 Error::other(message)
953 }
954 #[cfg(not(feature = "std"))]
955 {
956 Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
957 }
958}
959
960fn other_error(message: &str) -> Error {
961 #[cfg(feature = "std")]
962 {
963 Error::other(message)
964 }
965 #[cfg(not(feature = "std"))]
966 {
967 Error::new(
968 ErrorKind::Other,
969 alloc::boxed::Box::new(alloc::string::String::from(message)),
970 )
971 }
972}
973
974#[cfg(test)]
975mod tests {
976 use crate::decoding::StreamingDecoder;
977 use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
978 use crate::io::{Error, ErrorKind, Read, Write};
979 use alloc::vec;
980 use alloc::vec::Vec;
981
982 struct TinyMatcher {
983 last_space: Vec<u8>,
984 window_size: u64,
985 }
986
987 impl TinyMatcher {
988 fn new(window_size: u64) -> Self {
989 Self {
990 last_space: Vec::new(),
991 window_size,
992 }
993 }
994 }
995
996 impl Matcher for TinyMatcher {
997 fn get_next_space(&mut self) -> Vec<u8> {
998 vec![0; self.window_size as usize]
999 }
1000
1001 fn get_last_space(&mut self) -> &[u8] {
1002 self.last_space.as_slice()
1003 }
1004
1005 fn commit_space(&mut self, space: Vec<u8>) {
1006 self.last_space = space;
1007 }
1008
1009 fn skip_matching(&mut self) {}
1010
1011 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
1012 handle_sequence(Sequence::Literals {
1013 literals: self.last_space.as_slice(),
1014 });
1015 }
1016
1017 fn reset(&mut self, _level: CompressionLevel) {
1018 self.last_space.clear();
1019 }
1020
1021 fn window_size(&self) -> u64 {
1022 self.window_size
1023 }
1024 }
1025
1026 struct FailingWriteOnce {
1027 writes: usize,
1028 fail_on_write_number: usize,
1029 sink: Vec<u8>,
1030 }
1031
1032 impl FailingWriteOnce {
1033 fn new(fail_on_write_number: usize) -> Self {
1034 Self {
1035 writes: 0,
1036 fail_on_write_number,
1037 sink: Vec::new(),
1038 }
1039 }
1040 }
1041
1042 impl Write for FailingWriteOnce {
1043 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
1044 self.writes += 1;
1045 if self.writes == self.fail_on_write_number {
1046 return Err(super::other_error("injected write failure"));
1047 }
1048 self.sink.extend_from_slice(buf);
1049 Ok(buf.len())
1050 }
1051
1052 fn flush(&mut self) -> Result<(), Error> {
1053 Ok(())
1054 }
1055 }
1056
1057 struct FailingWithKind {
1058 writes: usize,
1059 fail_on_write_number: usize,
1060 kind: ErrorKind,
1061 }
1062
1063 impl FailingWithKind {
1064 fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
1065 Self {
1066 writes: 0,
1067 fail_on_write_number,
1068 kind,
1069 }
1070 }
1071 }
1072
1073 impl Write for FailingWithKind {
1074 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
1075 self.writes += 1;
1076 if self.writes == self.fail_on_write_number {
1077 return Err(Error::from(self.kind));
1078 }
1079 Ok(buf.len())
1080 }
1081
1082 fn flush(&mut self) -> Result<(), Error> {
1083 Ok(())
1084 }
1085 }
1086
1087 struct PartialThenFailWriter {
1088 writes: usize,
1089 fail_on_write_number: usize,
1090 partial_prefix_len: usize,
1091 terminal_failure: bool,
1092 sink: Vec<u8>,
1093 }
1094
1095 impl PartialThenFailWriter {
1096 fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
1097 Self {
1098 writes: 0,
1099 fail_on_write_number,
1100 partial_prefix_len,
1101 terminal_failure: false,
1102 sink: Vec::new(),
1103 }
1104 }
1105 }
1106
1107 impl Write for PartialThenFailWriter {
1108 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
1109 if self.terminal_failure {
1110 return Err(super::other_error("injected terminal write failure"));
1111 }
1112
1113 self.writes += 1;
1114 if self.writes == self.fail_on_write_number {
1115 let written = core::cmp::min(self.partial_prefix_len, buf.len());
1116 if written > 0 {
1117 self.sink.extend_from_slice(&buf[..written]);
1118 self.terminal_failure = true;
1119 return Ok(written);
1120 }
1121 return Err(super::other_error("injected terminal write failure"));
1122 }
1123
1124 self.sink.extend_from_slice(buf);
1125 Ok(buf.len())
1126 }
1127
1128 fn flush(&mut self) -> Result<(), Error> {
1129 Ok(())
1130 }
1131 }
1132
1133 #[test]
1137 fn streaming_encoder_set_magicless_before_write_omits_magic_and_roundtrips() {
1138 use crate::common::MAGIC_NUM;
1139 let payload = b"streaming-magicless-roundtrip-".repeat(64);
1140
1141 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1142 encoder
1143 .set_magicless(true)
1144 .expect("set_magicless pre-write");
1145 encoder.write_all(&payload).unwrap();
1146 let compressed = encoder.finish().unwrap();
1147
1148 assert!(
1149 !compressed.starts_with(&MAGIC_NUM.to_le_bytes()),
1150 "magicless frame must omit the 4-byte magic prefix",
1151 );
1152
1153 let mut decoder = crate::decoding::FrameDecoder::new();
1154 decoder.set_magicless(true);
1155 let mut cursor: &[u8] = compressed.as_slice();
1156 decoder.init(&mut cursor).expect("magicless init");
1157 decoder
1158 .decode_blocks(&mut cursor, crate::decoding::BlockDecodingStrategy::All)
1159 .expect("decode_blocks");
1160 let mut decoded: Vec<u8> = Vec::new();
1161 decoder
1162 .collect_to_writer(&mut decoded)
1163 .expect("collect_to_writer");
1164 assert_eq!(decoded, payload);
1165 }
1166
1167 #[test]
1172 fn streaming_encoder_set_magicless_after_first_write_errors() {
1173 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1174 encoder.write_all(b"first-block").unwrap();
1175 let err = encoder
1176 .set_magicless(true)
1177 .expect_err("set_magicless after first write must error");
1178 assert_eq!(
1179 err.kind(),
1180 crate::io::ErrorKind::InvalidInput,
1181 "expected InvalidInput when setting magicless after frame_started, got {err:?}",
1182 );
1183 }
1184
1185 #[test]
1186 fn streaming_encoder_roundtrip_multiple_writes() {
1187 let payload = b"streaming-encoder-roundtrip-".repeat(1024);
1188 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1189 for chunk in payload.chunks(313) {
1190 encoder.write_all(chunk).unwrap();
1191 }
1192 let compressed = encoder.finish().unwrap();
1193
1194 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1195 let mut decoded = Vec::new();
1196 decoder.read_to_end(&mut decoded).unwrap();
1197 assert_eq!(decoded, payload);
1198 }
1199
1200 #[test]
1201 fn flush_emits_nonempty_partial_output() {
1202 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1203 encoder.write_all(b"partial-block").unwrap();
1204 encoder.flush().unwrap();
1205 let flushed_len = encoder.get_ref().len();
1206 assert!(
1207 flushed_len > 0,
1208 "flush should emit header+partial block bytes"
1209 );
1210 let compressed = encoder.finish().unwrap();
1211 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1212 let mut decoded = Vec::new();
1213 decoder.read_to_end(&mut decoded).unwrap();
1214 assert_eq!(decoded, b"partial-block");
1215 }
1216
1217 #[test]
1218 fn flush_without_writes_does_not_emit_frame_header() {
1219 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1220 encoder.flush().unwrap();
1221 assert!(encoder.get_ref().is_empty());
1222 }
1223
1224 #[test]
1225 fn block_boundary_write_emits_block_in_same_call() {
1226 let mut boundary = StreamingEncoder::new_with_matcher(
1227 TinyMatcher::new(4),
1228 Vec::new(),
1229 CompressionLevel::Uncompressed,
1230 );
1231 let mut below = StreamingEncoder::new_with_matcher(
1232 TinyMatcher::new(4),
1233 Vec::new(),
1234 CompressionLevel::Uncompressed,
1235 );
1236
1237 boundary.write_all(b"ABCD").unwrap();
1238 below.write_all(b"ABC").unwrap();
1239
1240 let boundary_len = boundary.get_ref().len();
1241 let below_len = below.get_ref().len();
1242 assert!(
1243 boundary_len > below_len,
1244 "full block should be emitted immediately at block boundary"
1245 );
1246 }
1247
1248 #[test]
1249 fn finish_consumes_encoder_and_emits_frame() {
1250 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1251 encoder.write_all(b"abc").unwrap();
1252 let compressed = encoder.finish().unwrap();
1253 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1254 let mut decoded = Vec::new();
1255 decoder.read_to_end(&mut decoded).unwrap();
1256 assert_eq!(decoded, b"abc");
1257 }
1258
1259 #[test]
1260 fn finish_without_writes_emits_empty_frame() {
1261 let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1262 let compressed = encoder.finish().unwrap();
1263 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1264 let mut decoded = Vec::new();
1265 decoder.read_to_end(&mut decoded).unwrap();
1266 assert!(decoded.is_empty());
1267 }
1268
1269 #[test]
1270 fn write_empty_buffer_returns_zero() {
1271 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1272 assert_eq!(encoder.write(&[]).unwrap(), 0);
1273 let _ = encoder.finish().unwrap();
1274 }
1275
1276 #[test]
1277 fn uncompressed_level_roundtrip() {
1278 let payload = b"uncompressed-streaming-roundtrip".repeat(64);
1279 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
1280 for chunk in payload.chunks(41) {
1281 encoder.write_all(chunk).unwrap();
1282 }
1283 let compressed = encoder.finish().unwrap();
1284 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1285 let mut decoded = Vec::new();
1286 decoder.read_to_end(&mut decoded).unwrap();
1287 assert_eq!(decoded, payload);
1288 }
1289
1290 #[test]
1291 fn better_level_streaming_roundtrip() {
1292 let payload = b"better-level-streaming-test".repeat(256);
1293 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
1294 for chunk in payload.chunks(53) {
1295 encoder.write_all(chunk).unwrap();
1296 }
1297 let compressed = encoder.finish().unwrap();
1298 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1299 let mut decoded = Vec::new();
1300 decoder.read_to_end(&mut decoded).unwrap();
1301 assert_eq!(decoded, payload);
1302 }
1303
1304 #[test]
1305 fn zero_window_matcher_returns_invalid_input_error() {
1306 let mut encoder = StreamingEncoder::new_with_matcher(
1307 TinyMatcher::new(0),
1308 Vec::new(),
1309 CompressionLevel::Fastest,
1310 );
1311 let err = encoder.write_all(b"payload").unwrap_err();
1312 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1313 }
1314
1315 #[test]
1316 fn best_level_streaming_roundtrip() {
1317 let payload = b"best-level-streaming-test".repeat(8 * 1024);
1320 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
1321 for chunk in payload.chunks(53) {
1322 encoder.write_all(chunk).unwrap();
1323 }
1324 let compressed = encoder.finish().unwrap();
1325 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1326 let mut decoded = Vec::new();
1327 decoder.read_to_end(&mut decoded).unwrap();
1328 assert_eq!(decoded, payload);
1329 }
1330
1331 #[test]
1332 fn write_failure_poisoning_is_sticky() {
1333 let mut encoder = StreamingEncoder::new_with_matcher(
1334 TinyMatcher::new(4),
1335 FailingWriteOnce::new(1),
1336 CompressionLevel::Uncompressed,
1337 );
1338
1339 assert!(encoder.write_all(b"ABCD").is_err());
1340 assert!(encoder.flush().is_err());
1341 assert!(encoder.write_all(b"EFGH").is_err());
1342 assert_eq!(encoder.get_ref().sink.len(), 0);
1343 assert!(encoder.finish().is_err());
1344 }
1345
1346 #[test]
1347 fn poisoned_encoder_returns_original_error_kind() {
1348 let mut encoder = StreamingEncoder::new_with_matcher(
1349 TinyMatcher::new(4),
1350 FailingWithKind::new(1, ErrorKind::BrokenPipe),
1351 CompressionLevel::Uncompressed,
1352 );
1353
1354 let first_error = encoder.write_all(b"ABCD").unwrap_err();
1355 assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
1356
1357 let second_error = encoder.write_all(b"EFGH").unwrap_err();
1358 assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
1359 }
1360
1361 #[test]
1362 fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
1363 let payload = b"ABCDEFGHIJKL";
1364 let mut encoder = StreamingEncoder::new_with_matcher(
1365 TinyMatcher::new(4),
1366 FailingWriteOnce::new(3),
1367 CompressionLevel::Uncompressed,
1368 );
1369
1370 let first_write = encoder.write(payload).unwrap();
1371 assert_eq!(first_write, 8);
1372 assert!(encoder.write(&payload[first_write..]).is_err());
1373 assert!(encoder.flush().is_err());
1374 assert!(encoder.write_all(b"EFGH").is_err());
1375 }
1376
1377 #[test]
1378 fn partial_write_failure_after_progress_poisons_encoder() {
1379 let payload = b"ABCDEFGHIJKL";
1380 let mut encoder = StreamingEncoder::new_with_matcher(
1381 TinyMatcher::new(4),
1382 PartialThenFailWriter::new(3, 1),
1383 CompressionLevel::Uncompressed,
1384 );
1385
1386 let first_write = encoder.write(payload).unwrap();
1387 assert_eq!(first_write, 8);
1388
1389 let second_write = encoder.write(&payload[first_write..]);
1390 assert!(second_write.is_err());
1391 assert!(encoder.flush().is_err());
1392 assert!(encoder.write_all(b"MNOP").is_err());
1393 }
1394
1395 #[test]
1396 fn new_with_matcher_and_get_mut_work() {
1397 let matcher = TinyMatcher::new(128 * 1024);
1398 let mut encoder =
1399 StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
1400 encoder.get_mut().extend_from_slice(b"");
1401 encoder.write_all(b"custom-matcher").unwrap();
1402 let compressed = encoder.finish().unwrap();
1403 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1404 let mut decoded = Vec::new();
1405 decoder.read_to_end(&mut decoded).unwrap();
1406 assert_eq!(decoded, b"custom-matcher");
1407 }
1408
1409 #[test]
1410 fn pledged_content_size_written_in_header() {
1411 let payload = b"hello world, pledged size test";
1412 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1413 encoder
1414 .set_pledged_content_size(payload.len() as u64)
1415 .unwrap();
1416 encoder.write_all(payload).unwrap();
1417 let compressed = encoder.finish().unwrap();
1418
1419 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1421 .unwrap()
1422 .0;
1423 assert_eq!(header.frame_content_size(), payload.len() as u64);
1424
1425 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1427 let mut decoded = Vec::new();
1428 decoder.read_to_end(&mut decoded).unwrap();
1429 assert_eq!(decoded, payload);
1430 }
1431
1432 #[test]
1433 fn pledged_content_size_mismatch_returns_error() {
1434 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1435 encoder.set_pledged_content_size(100).unwrap();
1436 encoder.write_all(b"short payload").unwrap(); let err = encoder.finish().unwrap_err();
1438 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1439 }
1440
1441 #[test]
1442 fn write_exceeding_pledge_returns_error() {
1443 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1444 encoder.set_pledged_content_size(5).unwrap();
1445 let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
1446 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1447 }
1448
1449 #[test]
1450 fn write_straddling_pledge_reports_partial_progress() {
1451 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1452 encoder.set_pledged_content_size(5).unwrap();
1453 assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
1455 let err = encoder.write(b"g").unwrap_err();
1457 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1458 }
1459
1460 #[test]
1461 fn encoded_scratch_capacity_is_reused_across_blocks() {
1462 let payload = vec![0xAB; 64 * 3];
1463 let mut encoder = StreamingEncoder::new_with_matcher(
1464 TinyMatcher::new(64),
1465 Vec::new(),
1466 CompressionLevel::Uncompressed,
1467 );
1468
1469 encoder.write_all(&payload[..64]).unwrap();
1470 let first_capacity = encoder.encoded_scratch.capacity();
1471 assert!(
1472 first_capacity >= 67,
1473 "expected encoded scratch to keep block header + payload capacity",
1474 );
1475
1476 encoder.write_all(&payload[64..128]).unwrap();
1477 let second_capacity = encoder.encoded_scratch.capacity();
1478 assert!(
1479 second_capacity >= first_capacity,
1480 "encoded scratch capacity should be reused across block emits",
1481 );
1482
1483 encoder.write_all(&payload[128..]).unwrap();
1484 let compressed = encoder.finish().unwrap();
1485 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1486 let mut decoded = Vec::new();
1487 decoder.read_to_end(&mut decoded).unwrap();
1488 assert_eq!(decoded, payload);
1489 }
1490
1491 #[test]
1492 fn pledged_content_size_after_write_returns_error() {
1493 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1494 encoder.write_all(b"already writing").unwrap();
1495 let err = encoder.set_pledged_content_size(15).unwrap_err();
1496 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1497 }
1498
1499 #[test]
1500 fn source_size_hint_directly_reduces_window_header() {
1501 let payload = b"streaming-source-size-hint".repeat(64);
1502
1503 let mut no_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1504 no_hint.write_all(payload.as_slice()).unwrap();
1505 let no_hint_frame = no_hint.finish().unwrap();
1506 let no_hint_header = crate::decoding::frame::read_frame_header(no_hint_frame.as_slice())
1507 .unwrap()
1508 .0;
1509 let no_hint_window = no_hint_header.window_size().unwrap();
1510
1511 let mut with_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1512 with_hint
1513 .set_source_size_hint(payload.len() as u64)
1514 .unwrap();
1515 with_hint.write_all(payload.as_slice()).unwrap();
1516 let late_hint_err = with_hint
1517 .set_source_size_hint(payload.len() as u64)
1518 .unwrap_err();
1519 assert_eq!(late_hint_err.kind(), ErrorKind::InvalidInput);
1520 let with_hint_frame = with_hint.finish().unwrap();
1521 let with_hint_header =
1522 crate::decoding::frame::read_frame_header(with_hint_frame.as_slice())
1523 .unwrap()
1524 .0;
1525 let with_hint_window = with_hint_header.window_size().unwrap();
1526
1527 assert!(
1528 with_hint_window <= no_hint_window,
1529 "source size hint should not increase advertised window"
1530 );
1531
1532 let mut decoder = StreamingDecoder::new(with_hint_frame.as_slice()).unwrap();
1533 let mut decoded = Vec::new();
1534 decoder.read_to_end(&mut decoded).unwrap();
1535 assert_eq!(decoded, payload);
1536 }
1537
1538 #[test]
1539 fn single_segment_requires_pledged_to_fit_matcher_window() {
1540 let payload = b"streaming-window-gate-".repeat(60); let mut encoder = StreamingEncoder::new_with_matcher(
1542 TinyMatcher::new(1024),
1543 Vec::new(),
1544 CompressionLevel::Fastest,
1545 );
1546 encoder
1547 .set_pledged_content_size(payload.len() as u64)
1548 .unwrap();
1549 encoder.write_all(payload.as_slice()).unwrap();
1550 let compressed = encoder.finish().unwrap();
1551
1552 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1553 .unwrap()
1554 .0;
1555 assert_eq!(header.frame_content_size(), payload.len() as u64);
1556 assert!(
1557 !header.descriptor.single_segment_flag(),
1558 "single-segment must stay off when pledged content size exceeds matcher window"
1559 );
1560 assert!(
1561 header.window_size().unwrap() >= 1024,
1562 "window descriptor should be present when single-segment is disabled"
1563 );
1564 }
1565
1566 #[test]
1567 fn ensure_frame_started_refreshes_stale_strategy_tag_at_reset() {
1568 use crate::encoding::strategy::StrategyTag;
1584 for level in [
1585 CompressionLevel::Fastest,
1586 CompressionLevel::Default,
1587 CompressionLevel::Better,
1588 CompressionLevel::Best,
1589 ] {
1590 let expected = StrategyTag::for_compression_level(level);
1591 let mut encoder = StreamingEncoder::new(Vec::new(), level);
1592 let sentinel = StrategyTag::BtUltra2;
1597 assert_ne!(
1598 expected, sentinel,
1599 "sentinel must differ from the legitimate tag at level {level:?}",
1600 );
1601 encoder.state.strategy_tag = sentinel;
1602 encoder.write_all(b"x").unwrap();
1603 assert_eq!(
1604 encoder.state.strategy_tag, expected,
1605 "reset-time strategy_tag sync missing at level {level:?}: \
1606 sentinel survived `ensure_frame_started`",
1607 );
1608 let _ = encoder.finish().unwrap();
1609 }
1610 }
1611
1612 #[test]
1621 fn level_22_streaming_window_roundtrips_in_our_decoder() {
1622 let payload = b"level-22-streaming-window-cap-".repeat(512);
1623 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(22));
1624 for chunk in payload.chunks(101) {
1625 encoder.write_all(chunk).unwrap();
1626 }
1627 let compressed = encoder.finish().unwrap();
1628
1629 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1632 .unwrap()
1633 .0;
1634 let window = header.window_size().unwrap();
1635 assert!(
1636 window <= crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE,
1637 "L22 advertised window {window} exceeds decoder cap {}",
1638 crate::common::MAXIMUM_ALLOWED_WINDOW_SIZE,
1639 );
1640
1641 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1642 let mut decoded = Vec::new();
1643 decoder.read_to_end(&mut decoded).unwrap();
1644 assert_eq!(decoded, payload);
1645 }
1646
1647 #[test]
1651 fn streaming_encoder_set_content_checksum_false_clears_header_flag() {
1652 let payload = b"streaming-checksum-toggle-".repeat(64);
1653 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1654 encoder
1655 .set_content_checksum(false)
1656 .expect("set_content_checksum pre-write");
1657 encoder.write_all(&payload).unwrap();
1658 let compressed = encoder.finish().unwrap();
1659
1660 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1661 .unwrap()
1662 .0;
1663 assert!(
1664 !header.descriptor.content_checksum_flag(),
1665 "content_checksum(false) must clear the frame header flag",
1666 );
1667
1668 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1669 let mut decoded = Vec::new();
1670 decoder.read_to_end(&mut decoded).unwrap();
1671 assert_eq!(decoded, payload);
1672 }
1673
1674 #[cfg(feature = "hash")]
1678 #[test]
1679 fn streaming_encoder_set_content_checksum_false_omits_trailer() {
1680 let payload = b"streaming-checksum-trailer-".repeat(64);
1681
1682 let mut with = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1683 with.set_content_checksum(true)
1685 .expect("set_content_checksum pre-write");
1686 with.write_all(&payload).unwrap();
1687 let with_checksum = with.finish().unwrap();
1688
1689 let mut without = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1690 without
1691 .set_content_checksum(false)
1692 .expect("set_content_checksum pre-write");
1693 without.write_all(&payload).unwrap();
1694 let without_checksum = without.finish().unwrap();
1695
1696 assert!(
1697 crate::decoding::frame::read_frame_header(with_checksum.as_slice())
1698 .unwrap()
1699 .0
1700 .descriptor
1701 .content_checksum_flag(),
1702 "default checksum-on frame must set the header flag",
1703 );
1704 assert_eq!(
1705 with_checksum.len(),
1706 without_checksum.len() + 4,
1707 "checksum-on frame must carry exactly the 4-byte XXH64 trailer",
1708 );
1709 }
1710
1711 #[test]
1716 fn streaming_encoder_set_content_checksum_after_first_write_errors() {
1717 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1718 encoder.write_all(b"first-block").unwrap();
1719 let err = encoder
1720 .set_content_checksum(false)
1721 .expect_err("set_content_checksum after first write must error");
1722 assert_eq!(
1723 err.kind(),
1724 ErrorKind::InvalidInput,
1725 "expected InvalidInput when setting content checksum after frame_started, got {err:?}",
1726 );
1727 }
1728
1729 #[test]
1730 fn no_pledged_size_omits_fcs_from_header() {
1731 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1732 encoder.write_all(b"no pledged size").unwrap();
1733 let compressed = encoder.finish().unwrap();
1734
1735 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1737 .unwrap()
1738 .0;
1739 assert_eq!(header.frame_content_size(), 0);
1740 assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1743 }
1744
1745 #[test]
1746 fn streaming_encoder_with_dictionary_roundtrips_and_carries_dict_id() {
1747 use alloc::format;
1748 let dict_raw = include_bytes!("../../dict_tests/dictionary");
1749 let dict_id = crate::decoding::Dictionary::decode_dict(dict_raw)
1750 .unwrap()
1751 .id;
1752
1753 let mut payload = Vec::new();
1757 for i in 0..400u32 {
1758 payload.extend_from_slice(
1759 format!("tenant=demo table=orders key={i} region=eu payload=aaaaabbbbbccccc\n")
1760 .as_bytes(),
1761 );
1762 }
1763
1764 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Level(19));
1765 encoder
1766 .set_dictionary_from_bytes(dict_raw)
1767 .expect("attach dictionary");
1768 for chunk in payload.chunks(777) {
1769 encoder.write_all(chunk).unwrap();
1770 }
1771 let compressed = encoder.finish().unwrap();
1772
1773 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1776 .unwrap()
1777 .0;
1778 assert_eq!(header.dictionary_id(), Some(dict_id));
1779
1780 let mut decoder =
1782 StreamingDecoder::new_with_dictionary_bytes(compressed.as_slice(), dict_raw).unwrap();
1783 let mut decoded = Vec::new();
1784 decoder.read_to_end(&mut decoded).unwrap();
1785 assert_eq!(decoded, payload);
1786
1787 let mut nodict = StreamingEncoder::new(Vec::new(), CompressionLevel::Level(19));
1791 for chunk in payload.chunks(777) {
1792 nodict.write_all(chunk).unwrap();
1793 }
1794 let nodict_frame = nodict.finish().unwrap();
1795 assert!(
1796 compressed.len() <= nodict_frame.len(),
1797 "dict frame {} should not exceed no-dict frame {}",
1798 compressed.len(),
1799 nodict_frame.len()
1800 );
1801 }
1802
1803 #[test]
1804 fn streaming_encoder_strategy_override_survives_frame_start() {
1805 use crate::encoding::{CompressionParameters, Strategy};
1811 let level = CompressionLevel::Fastest;
1812 let level_tag = crate::encoding::strategy::StrategyTag::for_compression_level(level);
1813 let override_tag = Strategy::Greedy.tag();
1814 assert_ne!(
1815 level_tag, override_tag,
1816 "test needs an override that changes the derived tag"
1817 );
1818
1819 let params = CompressionParameters::builder(level)
1820 .strategy(Strategy::Greedy)
1821 .build()
1822 .unwrap();
1823 let payload = b"override must outlive the frame header";
1824 let mut encoder = StreamingEncoder::new(Vec::new(), level);
1825 encoder.set_parameters(¶ms).unwrap();
1826 encoder.write_all(payload).unwrap();
1827 assert_eq!(
1828 encoder.state.strategy_tag, override_tag,
1829 "strategy override was discarded when the frame started"
1830 );
1831
1832 let compressed = encoder.finish().unwrap();
1833 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1834 let mut decoded = Vec::new();
1835 decoder.read_to_end(&mut decoded).unwrap();
1836 assert_eq!(decoded, payload);
1837 }
1838
1839 #[test]
1840 fn streaming_encoder_uncompressed_with_dictionary_omits_dict_id() {
1841 let dict_raw = include_bytes!("../../dict_tests/dictionary");
1847 let payload = b"tenant=demo table=orders region=eu payload=aaaaabbbbbccccc";
1848 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
1849 encoder
1850 .set_dictionary_from_bytes(dict_raw)
1851 .expect("attach dictionary");
1852 encoder.write_all(payload).unwrap();
1853 let compressed = encoder.finish().unwrap();
1854
1855 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1856 .unwrap()
1857 .0;
1858 assert_eq!(
1859 header.dictionary_id(),
1860 None,
1861 "uncompressed frame must not require a dictionary at decode time"
1862 );
1863
1864 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1866 let mut decoded = Vec::new();
1867 decoder.read_to_end(&mut decoded).unwrap();
1868 assert_eq!(decoded, payload);
1869 }
1870}