1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14 CompressionLevel, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15 frame_compressor::CompressState, frame_compressor::FseTables, frame_header::FrameHeader,
16};
17use crate::io::{Error, ErrorKind, Write};
18
19pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
25 drain: Option<W>,
26 compression_level: CompressionLevel,
27 state: CompressState<M>,
28 pending: Vec<u8>,
29 encoded_scratch: Vec<u8>,
30 errored: bool,
31 last_error_kind: Option<ErrorKind>,
32 last_error_message: Option<String>,
33 frame_started: bool,
34 pledged_content_size: Option<u64>,
35 bytes_consumed: u64,
36 #[cfg(feature = "hash")]
37 hasher: XxHash64,
38}
39
40impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
41 pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
46 Self::new_with_matcher(
47 MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
48 drain,
49 compression_level,
50 )
51 }
52}
53
54impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
55 pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
60 Self {
61 drain: Some(drain),
62 compression_level,
63 state: CompressState {
64 matcher,
65 last_huff_table: None,
66 fse_tables: FseTables::new(),
67 offset_hist: [1, 4, 8],
68 },
69 pending: Vec::new(),
70 encoded_scratch: Vec::new(),
71 errored: false,
72 last_error_kind: None,
73 last_error_message: None,
74 frame_started: false,
75 pledged_content_size: None,
76 bytes_consumed: 0,
77 #[cfg(feature = "hash")]
78 hasher: XxHash64::with_seed(0),
79 }
80 }
81
82 pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
93 self.ensure_open()?;
94 if self.frame_started {
95 return Err(invalid_input_error(
96 "pledged content size must be set before the first write",
97 ));
98 }
99 self.pledged_content_size = Some(size);
100 self.state.matcher.set_source_size_hint(size);
103 Ok(())
104 }
105
106 pub fn set_source_size_hint(&mut self, size: u64) -> Result<(), Error> {
114 self.ensure_open()?;
115 if self.frame_started {
116 return Err(invalid_input_error(
117 "source size hint must be set before the first write",
118 ));
119 }
120 self.state.matcher.set_source_size_hint(size);
121 Ok(())
122 }
123
124 pub fn get_ref(&self) -> &W {
129 self.drain
130 .as_ref()
131 .expect("streaming encoder drain is present until finish consumes self")
132 }
133
134 pub fn get_mut(&mut self) -> &mut W {
142 self.drain
143 .as_mut()
144 .expect("streaming encoder drain is present until finish consumes self")
145 }
146
147 pub fn finish(mut self) -> Result<W, Error> {
152 self.ensure_open()?;
153
154 if let Some(pledged) = self.pledged_content_size
158 && self.bytes_consumed != pledged
159 {
160 return Err(invalid_input_error(
161 "pledged content size does not match bytes consumed",
162 ));
163 }
164
165 self.ensure_frame_started()?;
166
167 if self.pending.is_empty() {
168 self.write_empty_last_block()
169 .map_err(|err| self.fail(err))?;
170 } else {
171 self.emit_pending_block(true)?;
172 }
173
174 let mut drain = self
175 .drain
176 .take()
177 .expect("streaming encoder drain must be present when finishing");
178
179 #[cfg(feature = "hash")]
180 {
181 let checksum = self.hasher.finish() as u32;
182 drain
183 .write_all(&checksum.to_le_bytes())
184 .map_err(|err| self.fail(err))?;
185 }
186
187 drain.flush().map_err(|err| self.fail(err))?;
188 Ok(drain)
189 }
190
191 fn ensure_open(&self) -> Result<(), Error> {
192 if self.errored {
193 return Err(self.sticky_error());
194 }
195 Ok(())
196 }
197
198 fn sticky_error(&self) -> Error {
202 match (self.last_error_kind, self.last_error_message.as_deref()) {
203 (Some(kind), Some(message)) => error_with_kind_message(
204 kind,
205 format!(
206 "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
207 ),
208 ),
209 (Some(kind), None) => error_from_kind(kind),
210 (None, Some(message)) => other_error_owned(format!(
211 "streaming encoder is in an errored state: {message}"
212 )),
213 (None, None) => other_error("streaming encoder is in an errored state"),
214 }
215 }
216
217 fn drain_mut(&mut self) -> Result<&mut W, Error> {
218 self.drain
219 .as_mut()
220 .ok_or_else(|| other_error("streaming encoder has no active drain"))
221 }
222
223 fn ensure_frame_started(&mut self) -> Result<(), Error> {
224 if self.frame_started {
225 return Ok(());
226 }
227
228 self.ensure_level_supported()?;
229 self.state.matcher.reset(self.compression_level);
230 self.state.offset_hist = [1, 4, 8];
231 self.state.last_huff_table = None;
232 self.state.fse_tables.ll_previous = None;
233 self.state.fse_tables.ml_previous = None;
234 self.state.fse_tables.of_previous = None;
235 #[cfg(feature = "hash")]
236 {
237 self.hasher = XxHash64::with_seed(0);
238 }
239
240 let window_size = self.state.matcher.window_size();
241 if window_size == 0 {
242 return Err(invalid_input_error(
243 "matcher reported window_size == 0, which is invalid",
244 ));
245 }
246
247 let single_segment = self
253 .pledged_content_size
254 .map(|size| (512..=(1 << 14)).contains(&size) && size <= window_size)
255 .unwrap_or(false);
256
257 let header = FrameHeader {
258 frame_content_size: self.pledged_content_size,
259 single_segment,
260 content_checksum: cfg!(feature = "hash"),
261 dictionary_id: None,
262 window_size: if single_segment {
263 None
264 } else {
265 Some(window_size)
266 },
267 };
268 let mut encoded_header = Vec::new();
269 header.serialize(&mut encoded_header);
270 self.drain_mut()
271 .and_then(|drain| drain.write_all(&encoded_header))
272 .map_err(|err| self.fail(err))?;
273
274 self.frame_started = true;
275 Ok(())
276 }
277
278 fn block_capacity(&self) -> usize {
279 let matcher_window = self.state.matcher.window_size() as usize;
280 core::cmp::max(1, core::cmp::min(matcher_window, MAX_BLOCK_SIZE as usize))
281 }
282
283 fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
284 let mut space = match self.compression_level {
285 CompressionLevel::Fastest
286 | CompressionLevel::Default
287 | CompressionLevel::Better
288 | CompressionLevel::Best
289 | CompressionLevel::Level(_) => self.state.matcher.get_next_space(),
290 CompressionLevel::Uncompressed => Vec::new(),
291 };
292 space.clear();
293 if space.capacity() > block_capacity {
294 space.shrink_to(block_capacity);
295 }
296 if space.capacity() < block_capacity {
297 space.reserve(block_capacity - space.capacity());
298 }
299 space
300 }
301
302 fn emit_full_pending_block(
303 &mut self,
304 block_capacity: usize,
305 consumed: usize,
306 ) -> Option<Result<usize, Error>> {
307 if self.pending.len() != block_capacity {
308 return None;
309 }
310
311 let new_pending = self.allocate_pending_space(block_capacity);
312 let full_block = mem::replace(&mut self.pending, new_pending);
313 if let Err((err, restored_block)) = self.encode_block(full_block, false) {
314 self.pending = restored_block;
315 let err = self.fail(err);
316 if consumed > 0 {
317 return Some(Ok(consumed));
318 }
319 return Some(Err(err));
320 }
321 None
322 }
323
324 fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
325 let block = mem::take(&mut self.pending);
326 if let Err((err, restored_block)) = self.encode_block(block, last_block) {
327 self.pending = restored_block;
328 return Err(self.fail(err));
329 }
330 if !last_block {
331 let block_capacity = self.block_capacity();
332 self.pending = self.allocate_pending_space(block_capacity);
333 }
334 Ok(())
335 }
336
337 fn ensure_level_supported(&self) -> Result<(), Error> {
341 match self.compression_level {
342 CompressionLevel::Uncompressed
343 | CompressionLevel::Fastest
344 | CompressionLevel::Default
345 | CompressionLevel::Better
346 | CompressionLevel::Best
347 | CompressionLevel::Level(_) => Ok(()),
348 }
349 }
350
351 fn encode_block(
352 &mut self,
353 uncompressed_data: Vec<u8>,
354 last_block: bool,
355 ) -> Result<(), (Error, Vec<u8>)> {
356 let mut raw_block = Some(uncompressed_data);
357 let mut encoded = Vec::new();
358 mem::swap(&mut encoded, &mut self.encoded_scratch);
359 encoded.clear();
360 let needed_capacity = self.block_capacity() + 3;
361 if encoded.capacity() < needed_capacity {
362 encoded.reserve(needed_capacity.saturating_sub(encoded.len()));
363 }
364 let mut moved_into_matcher = false;
365 if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
366 let header = BlockHeader {
367 last_block,
368 block_type: crate::blocks::block::BlockType::Raw,
369 block_size: 0,
370 };
371 header.serialize(&mut encoded);
372 } else {
373 match self.compression_level {
374 CompressionLevel::Uncompressed => {
375 let block = raw_block.as_ref().expect("raw block missing");
376 let header = BlockHeader {
377 last_block,
378 block_type: crate::blocks::block::BlockType::Raw,
379 block_size: block.len() as u32,
380 };
381 header.serialize(&mut encoded);
382 encoded.extend_from_slice(block);
383 }
384 CompressionLevel::Fastest
385 | CompressionLevel::Default
386 | CompressionLevel::Better
387 | CompressionLevel::Best
388 | CompressionLevel::Level(_) => {
389 let block = raw_block.take().expect("raw block missing");
390 debug_assert!(!block.is_empty(), "empty blocks handled above");
391 compress_block_encoded(
392 &mut self.state,
393 self.compression_level,
394 last_block,
395 block,
396 &mut encoded,
397 );
398 moved_into_matcher = true;
399 }
400 }
401 }
402
403 if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
404 encoded.clear();
405 mem::swap(&mut encoded, &mut self.encoded_scratch);
406 let restored = if moved_into_matcher {
407 self.state.matcher.get_last_space().to_vec()
408 } else {
409 raw_block.unwrap_or_default()
410 };
411 return Err((err, restored));
412 }
413
414 if moved_into_matcher {
415 #[cfg(feature = "hash")]
416 {
417 self.hasher.write(self.state.matcher.get_last_space());
418 }
419 } else {
420 self.hash_block(raw_block.as_deref().unwrap_or(&[]));
421 }
422 encoded.clear();
423 mem::swap(&mut encoded, &mut self.encoded_scratch);
424 Ok(())
425 }
426
427 fn write_empty_last_block(&mut self) -> Result<(), Error> {
428 self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
429 }
430
431 fn fail(&mut self, err: Error) -> Error {
432 self.errored = true;
433 if self.last_error_kind.is_none() {
434 self.last_error_kind = Some(err.kind());
435 }
436 if self.last_error_message.is_none() {
437 self.last_error_message = Some(err.to_string());
438 }
439 err
440 }
441
442 #[cfg(feature = "hash")]
443 fn hash_block(&mut self, uncompressed_data: &[u8]) {
444 self.hasher.write(uncompressed_data);
445 }
446
447 #[cfg(not(feature = "hash"))]
448 fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
449}
450
451impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
452 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
453 self.ensure_open()?;
454 if buf.is_empty() {
455 return Ok(0);
456 }
457
458 if let Some(pledged) = self.pledged_content_size
462 && self.bytes_consumed >= pledged
463 {
464 return Err(invalid_input_error(
465 "write would exceed pledged content size",
466 ));
467 }
468
469 self.ensure_frame_started()?;
470
471 let buf = if let Some(pledged) = self.pledged_content_size {
475 let remaining_allowed = pledged
476 .checked_sub(self.bytes_consumed)
477 .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
478 if remaining_allowed == 0 {
479 return Err(invalid_input_error(
480 "write would exceed pledged content size",
481 ));
482 }
483 let accepted = core::cmp::min(
484 buf.len(),
485 usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
486 );
487 &buf[..accepted]
488 } else {
489 buf
490 };
491
492 let block_capacity = self.block_capacity();
493 if self.pending.capacity() == 0 {
494 self.pending = self.allocate_pending_space(block_capacity);
495 }
496 let mut remaining = buf;
497 let mut consumed = 0usize;
498
499 while !remaining.is_empty() {
500 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
501 return result;
502 }
503
504 let available = block_capacity - self.pending.len();
505 let to_take = core::cmp::min(remaining.len(), available);
506 if to_take == 0 {
507 break;
508 }
509 self.pending.extend_from_slice(&remaining[..to_take]);
510 remaining = &remaining[to_take..];
511 consumed += to_take;
512
513 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
514 if let Ok(n) = &result {
515 self.bytes_consumed += *n as u64;
516 }
517 return result;
518 }
519 }
520 self.bytes_consumed += consumed as u64;
521 Ok(consumed)
522 }
523
524 fn flush(&mut self) -> Result<(), Error> {
525 self.ensure_open()?;
526 if self.pending.is_empty() {
527 return self
528 .drain_mut()
529 .and_then(|drain| drain.flush())
530 .map_err(|err| self.fail(err));
531 }
532 self.ensure_frame_started()?;
533 self.emit_pending_block(false)?;
534 self.drain_mut()
535 .and_then(|drain| drain.flush())
536 .map_err(|err| self.fail(err))
537 }
538}
539
540fn error_from_kind(kind: ErrorKind) -> Error {
541 Error::from(kind)
542}
543
544fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
545 #[cfg(feature = "std")]
546 {
547 Error::new(kind, message)
548 }
549 #[cfg(not(feature = "std"))]
550 {
551 Error::new(kind, alloc::boxed::Box::new(message))
552 }
553}
554
555fn invalid_input_error(message: &str) -> Error {
556 #[cfg(feature = "std")]
557 {
558 Error::new(ErrorKind::InvalidInput, message)
559 }
560 #[cfg(not(feature = "std"))]
561 {
562 Error::new(
563 ErrorKind::Other,
564 alloc::boxed::Box::new(alloc::string::String::from(message)),
565 )
566 }
567}
568
569fn other_error_owned(message: String) -> Error {
570 #[cfg(feature = "std")]
571 {
572 Error::other(message)
573 }
574 #[cfg(not(feature = "std"))]
575 {
576 Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
577 }
578}
579
580fn other_error(message: &str) -> Error {
581 #[cfg(feature = "std")]
582 {
583 Error::other(message)
584 }
585 #[cfg(not(feature = "std"))]
586 {
587 Error::new(
588 ErrorKind::Other,
589 alloc::boxed::Box::new(alloc::string::String::from(message)),
590 )
591 }
592}
593
594#[cfg(test)]
595mod tests {
596 use crate::decoding::StreamingDecoder;
597 use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
598 use crate::io::{Error, ErrorKind, Read, Write};
599 use alloc::vec;
600 use alloc::vec::Vec;
601
602 struct TinyMatcher {
603 last_space: Vec<u8>,
604 window_size: u64,
605 }
606
607 impl TinyMatcher {
608 fn new(window_size: u64) -> Self {
609 Self {
610 last_space: Vec::new(),
611 window_size,
612 }
613 }
614 }
615
616 impl Matcher for TinyMatcher {
617 fn get_next_space(&mut self) -> Vec<u8> {
618 vec![0; self.window_size as usize]
619 }
620
621 fn get_last_space(&mut self) -> &[u8] {
622 self.last_space.as_slice()
623 }
624
625 fn commit_space(&mut self, space: Vec<u8>) {
626 self.last_space = space;
627 }
628
629 fn skip_matching(&mut self) {}
630
631 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
632 handle_sequence(Sequence::Literals {
633 literals: self.last_space.as_slice(),
634 });
635 }
636
637 fn reset(&mut self, _level: CompressionLevel) {
638 self.last_space.clear();
639 }
640
641 fn window_size(&self) -> u64 {
642 self.window_size
643 }
644 }
645
646 struct FailingWriteOnce {
647 writes: usize,
648 fail_on_write_number: usize,
649 sink: Vec<u8>,
650 }
651
652 impl FailingWriteOnce {
653 fn new(fail_on_write_number: usize) -> Self {
654 Self {
655 writes: 0,
656 fail_on_write_number,
657 sink: Vec::new(),
658 }
659 }
660 }
661
662 impl Write for FailingWriteOnce {
663 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
664 self.writes += 1;
665 if self.writes == self.fail_on_write_number {
666 return Err(super::other_error("injected write failure"));
667 }
668 self.sink.extend_from_slice(buf);
669 Ok(buf.len())
670 }
671
672 fn flush(&mut self) -> Result<(), Error> {
673 Ok(())
674 }
675 }
676
677 struct FailingWithKind {
678 writes: usize,
679 fail_on_write_number: usize,
680 kind: ErrorKind,
681 }
682
683 impl FailingWithKind {
684 fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
685 Self {
686 writes: 0,
687 fail_on_write_number,
688 kind,
689 }
690 }
691 }
692
693 impl Write for FailingWithKind {
694 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
695 self.writes += 1;
696 if self.writes == self.fail_on_write_number {
697 return Err(Error::from(self.kind));
698 }
699 Ok(buf.len())
700 }
701
702 fn flush(&mut self) -> Result<(), Error> {
703 Ok(())
704 }
705 }
706
707 struct PartialThenFailWriter {
708 writes: usize,
709 fail_on_write_number: usize,
710 partial_prefix_len: usize,
711 terminal_failure: bool,
712 sink: Vec<u8>,
713 }
714
715 impl PartialThenFailWriter {
716 fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
717 Self {
718 writes: 0,
719 fail_on_write_number,
720 partial_prefix_len,
721 terminal_failure: false,
722 sink: Vec::new(),
723 }
724 }
725 }
726
727 impl Write for PartialThenFailWriter {
728 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
729 if self.terminal_failure {
730 return Err(super::other_error("injected terminal write failure"));
731 }
732
733 self.writes += 1;
734 if self.writes == self.fail_on_write_number {
735 let written = core::cmp::min(self.partial_prefix_len, buf.len());
736 if written > 0 {
737 self.sink.extend_from_slice(&buf[..written]);
738 self.terminal_failure = true;
739 return Ok(written);
740 }
741 return Err(super::other_error("injected terminal write failure"));
742 }
743
744 self.sink.extend_from_slice(buf);
745 Ok(buf.len())
746 }
747
748 fn flush(&mut self) -> Result<(), Error> {
749 Ok(())
750 }
751 }
752
753 #[test]
754 fn streaming_encoder_roundtrip_multiple_writes() {
755 let payload = b"streaming-encoder-roundtrip-".repeat(1024);
756 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
757 for chunk in payload.chunks(313) {
758 encoder.write_all(chunk).unwrap();
759 }
760 let compressed = encoder.finish().unwrap();
761
762 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
763 let mut decoded = Vec::new();
764 decoder.read_to_end(&mut decoded).unwrap();
765 assert_eq!(decoded, payload);
766 }
767
768 #[test]
769 fn flush_emits_nonempty_partial_output() {
770 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
771 encoder.write_all(b"partial-block").unwrap();
772 encoder.flush().unwrap();
773 let flushed_len = encoder.get_ref().len();
774 assert!(
775 flushed_len > 0,
776 "flush should emit header+partial block bytes"
777 );
778 let compressed = encoder.finish().unwrap();
779 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
780 let mut decoded = Vec::new();
781 decoder.read_to_end(&mut decoded).unwrap();
782 assert_eq!(decoded, b"partial-block");
783 }
784
785 #[test]
786 fn flush_without_writes_does_not_emit_frame_header() {
787 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
788 encoder.flush().unwrap();
789 assert!(encoder.get_ref().is_empty());
790 }
791
792 #[test]
793 fn block_boundary_write_emits_block_in_same_call() {
794 let mut boundary = StreamingEncoder::new_with_matcher(
795 TinyMatcher::new(4),
796 Vec::new(),
797 CompressionLevel::Uncompressed,
798 );
799 let mut below = StreamingEncoder::new_with_matcher(
800 TinyMatcher::new(4),
801 Vec::new(),
802 CompressionLevel::Uncompressed,
803 );
804
805 boundary.write_all(b"ABCD").unwrap();
806 below.write_all(b"ABC").unwrap();
807
808 let boundary_len = boundary.get_ref().len();
809 let below_len = below.get_ref().len();
810 assert!(
811 boundary_len > below_len,
812 "full block should be emitted immediately at block boundary"
813 );
814 }
815
816 #[test]
817 fn finish_consumes_encoder_and_emits_frame() {
818 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
819 encoder.write_all(b"abc").unwrap();
820 let compressed = encoder.finish().unwrap();
821 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
822 let mut decoded = Vec::new();
823 decoder.read_to_end(&mut decoded).unwrap();
824 assert_eq!(decoded, b"abc");
825 }
826
827 #[test]
828 fn finish_without_writes_emits_empty_frame() {
829 let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
830 let compressed = encoder.finish().unwrap();
831 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
832 let mut decoded = Vec::new();
833 decoder.read_to_end(&mut decoded).unwrap();
834 assert!(decoded.is_empty());
835 }
836
837 #[test]
838 fn write_empty_buffer_returns_zero() {
839 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
840 assert_eq!(encoder.write(&[]).unwrap(), 0);
841 let _ = encoder.finish().unwrap();
842 }
843
844 #[test]
845 fn uncompressed_level_roundtrip() {
846 let payload = b"uncompressed-streaming-roundtrip".repeat(64);
847 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
848 for chunk in payload.chunks(41) {
849 encoder.write_all(chunk).unwrap();
850 }
851 let compressed = encoder.finish().unwrap();
852 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
853 let mut decoded = Vec::new();
854 decoder.read_to_end(&mut decoded).unwrap();
855 assert_eq!(decoded, payload);
856 }
857
858 #[test]
859 fn better_level_streaming_roundtrip() {
860 let payload = b"better-level-streaming-test".repeat(256);
861 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
862 for chunk in payload.chunks(53) {
863 encoder.write_all(chunk).unwrap();
864 }
865 let compressed = encoder.finish().unwrap();
866 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
867 let mut decoded = Vec::new();
868 decoder.read_to_end(&mut decoded).unwrap();
869 assert_eq!(decoded, payload);
870 }
871
872 #[test]
873 fn zero_window_matcher_returns_invalid_input_error() {
874 let mut encoder = StreamingEncoder::new_with_matcher(
875 TinyMatcher::new(0),
876 Vec::new(),
877 CompressionLevel::Fastest,
878 );
879 let err = encoder.write_all(b"payload").unwrap_err();
880 assert_eq!(err.kind(), ErrorKind::InvalidInput);
881 }
882
883 #[test]
884 fn best_level_streaming_roundtrip() {
885 let payload = b"best-level-streaming-test".repeat(8 * 1024);
888 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
889 for chunk in payload.chunks(53) {
890 encoder.write_all(chunk).unwrap();
891 }
892 let compressed = encoder.finish().unwrap();
893 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
894 let mut decoded = Vec::new();
895 decoder.read_to_end(&mut decoded).unwrap();
896 assert_eq!(decoded, payload);
897 }
898
899 #[test]
900 fn write_failure_poisoning_is_sticky() {
901 let mut encoder = StreamingEncoder::new_with_matcher(
902 TinyMatcher::new(4),
903 FailingWriteOnce::new(1),
904 CompressionLevel::Uncompressed,
905 );
906
907 assert!(encoder.write_all(b"ABCD").is_err());
908 assert!(encoder.flush().is_err());
909 assert!(encoder.write_all(b"EFGH").is_err());
910 assert_eq!(encoder.get_ref().sink.len(), 0);
911 assert!(encoder.finish().is_err());
912 }
913
914 #[test]
915 fn poisoned_encoder_returns_original_error_kind() {
916 let mut encoder = StreamingEncoder::new_with_matcher(
917 TinyMatcher::new(4),
918 FailingWithKind::new(1, ErrorKind::BrokenPipe),
919 CompressionLevel::Uncompressed,
920 );
921
922 let first_error = encoder.write_all(b"ABCD").unwrap_err();
923 assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
924
925 let second_error = encoder.write_all(b"EFGH").unwrap_err();
926 assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
927 }
928
929 #[test]
930 fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
931 let payload = b"ABCDEFGHIJKL";
932 let mut encoder = StreamingEncoder::new_with_matcher(
933 TinyMatcher::new(4),
934 FailingWriteOnce::new(3),
935 CompressionLevel::Uncompressed,
936 );
937
938 let first_write = encoder.write(payload).unwrap();
939 assert_eq!(first_write, 8);
940 assert!(encoder.write(&payload[first_write..]).is_err());
941 assert!(encoder.flush().is_err());
942 assert!(encoder.write_all(b"EFGH").is_err());
943 }
944
945 #[test]
946 fn partial_write_failure_after_progress_poisons_encoder() {
947 let payload = b"ABCDEFGHIJKL";
948 let mut encoder = StreamingEncoder::new_with_matcher(
949 TinyMatcher::new(4),
950 PartialThenFailWriter::new(3, 1),
951 CompressionLevel::Uncompressed,
952 );
953
954 let first_write = encoder.write(payload).unwrap();
955 assert_eq!(first_write, 8);
956
957 let second_write = encoder.write(&payload[first_write..]);
958 assert!(second_write.is_err());
959 assert!(encoder.flush().is_err());
960 assert!(encoder.write_all(b"MNOP").is_err());
961 }
962
963 #[test]
964 fn new_with_matcher_and_get_mut_work() {
965 let matcher = TinyMatcher::new(128 * 1024);
966 let mut encoder =
967 StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
968 encoder.get_mut().extend_from_slice(b"");
969 encoder.write_all(b"custom-matcher").unwrap();
970 let compressed = encoder.finish().unwrap();
971 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
972 let mut decoded = Vec::new();
973 decoder.read_to_end(&mut decoded).unwrap();
974 assert_eq!(decoded, b"custom-matcher");
975 }
976
977 #[cfg(feature = "std")]
978 #[test]
979 fn streaming_encoder_output_decompresses_with_c_zstd() {
980 let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
981 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
982 for chunk in payload.chunks(1024) {
983 encoder.write_all(chunk).unwrap();
984 }
985 let compressed = encoder.finish().unwrap();
986
987 let mut decoded = Vec::with_capacity(payload.len());
988 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
989 assert_eq!(decoded, payload);
990 }
991
992 #[test]
993 fn pledged_content_size_written_in_header() {
994 let payload = b"hello world, pledged size test";
995 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
996 encoder
997 .set_pledged_content_size(payload.len() as u64)
998 .unwrap();
999 encoder.write_all(payload).unwrap();
1000 let compressed = encoder.finish().unwrap();
1001
1002 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1004 .unwrap()
1005 .0;
1006 assert_eq!(header.frame_content_size(), payload.len() as u64);
1007
1008 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1010 let mut decoded = Vec::new();
1011 decoder.read_to_end(&mut decoded).unwrap();
1012 assert_eq!(decoded, payload);
1013 }
1014
1015 #[test]
1016 fn pledged_content_size_mismatch_returns_error() {
1017 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1018 encoder.set_pledged_content_size(100).unwrap();
1019 encoder.write_all(b"short payload").unwrap(); let err = encoder.finish().unwrap_err();
1021 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1022 }
1023
1024 #[test]
1025 fn write_exceeding_pledge_returns_error() {
1026 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1027 encoder.set_pledged_content_size(5).unwrap();
1028 let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
1029 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1030 }
1031
1032 #[test]
1033 fn write_straddling_pledge_reports_partial_progress() {
1034 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1035 encoder.set_pledged_content_size(5).unwrap();
1036 assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
1038 let err = encoder.write(b"g").unwrap_err();
1040 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1041 }
1042
1043 #[test]
1044 fn encoded_scratch_capacity_is_reused_across_blocks() {
1045 let payload = vec![0xAB; 64 * 3];
1046 let mut encoder = StreamingEncoder::new_with_matcher(
1047 TinyMatcher::new(64),
1048 Vec::new(),
1049 CompressionLevel::Uncompressed,
1050 );
1051
1052 encoder.write_all(&payload[..64]).unwrap();
1053 let first_capacity = encoder.encoded_scratch.capacity();
1054 assert!(
1055 first_capacity >= 67,
1056 "expected encoded scratch to keep block header + payload capacity",
1057 );
1058
1059 encoder.write_all(&payload[64..128]).unwrap();
1060 let second_capacity = encoder.encoded_scratch.capacity();
1061 assert!(
1062 second_capacity >= first_capacity,
1063 "encoded scratch capacity should be reused across block emits",
1064 );
1065
1066 encoder.write_all(&payload[128..]).unwrap();
1067 let compressed = encoder.finish().unwrap();
1068 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
1069 let mut decoded = Vec::new();
1070 decoder.read_to_end(&mut decoded).unwrap();
1071 assert_eq!(decoded, payload);
1072 }
1073
1074 #[test]
1075 fn pledged_content_size_after_write_returns_error() {
1076 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1077 encoder.write_all(b"already writing").unwrap();
1078 let err = encoder.set_pledged_content_size(15).unwrap_err();
1079 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1080 }
1081
1082 #[test]
1083 fn source_size_hint_directly_reduces_window_header() {
1084 let payload = b"streaming-source-size-hint".repeat(64);
1085
1086 let mut no_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1087 no_hint.write_all(payload.as_slice()).unwrap();
1088 let no_hint_frame = no_hint.finish().unwrap();
1089 let no_hint_header = crate::decoding::frame::read_frame_header(no_hint_frame.as_slice())
1090 .unwrap()
1091 .0;
1092 let no_hint_window = no_hint_header.window_size().unwrap();
1093
1094 let mut with_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1095 with_hint
1096 .set_source_size_hint(payload.len() as u64)
1097 .unwrap();
1098 with_hint.write_all(payload.as_slice()).unwrap();
1099 let late_hint_err = with_hint
1100 .set_source_size_hint(payload.len() as u64)
1101 .unwrap_err();
1102 assert_eq!(late_hint_err.kind(), ErrorKind::InvalidInput);
1103 let with_hint_frame = with_hint.finish().unwrap();
1104 let with_hint_header =
1105 crate::decoding::frame::read_frame_header(with_hint_frame.as_slice())
1106 .unwrap()
1107 .0;
1108 let with_hint_window = with_hint_header.window_size().unwrap();
1109
1110 assert!(
1111 with_hint_window <= no_hint_window,
1112 "source size hint should not increase advertised window"
1113 );
1114
1115 let mut decoder = StreamingDecoder::new(with_hint_frame.as_slice()).unwrap();
1116 let mut decoded = Vec::new();
1117 decoder.read_to_end(&mut decoded).unwrap();
1118 assert_eq!(decoded, payload);
1119 }
1120
1121 #[cfg(feature = "std")]
1122 #[test]
1123 fn pledged_content_size_c_zstd_compatible() {
1124 let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
1125 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1126 encoder
1127 .set_pledged_content_size(payload.len() as u64)
1128 .unwrap();
1129 for chunk in payload.chunks(1024) {
1130 encoder.write_all(chunk).unwrap();
1131 }
1132 let compressed = encoder.finish().unwrap();
1133
1134 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1136 .unwrap()
1137 .0;
1138 assert_eq!(header.frame_content_size(), payload.len() as u64);
1139
1140 let mut decoded = Vec::new();
1142 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1143 assert_eq!(decoded, payload);
1144 }
1145
1146 #[test]
1147 fn single_segment_requires_pledged_to_fit_matcher_window() {
1148 let payload = b"streaming-window-gate-".repeat(60); let mut encoder = StreamingEncoder::new_with_matcher(
1150 TinyMatcher::new(1024),
1151 Vec::new(),
1152 CompressionLevel::Fastest,
1153 );
1154 encoder
1155 .set_pledged_content_size(payload.len() as u64)
1156 .unwrap();
1157 encoder.write_all(payload.as_slice()).unwrap();
1158 let compressed = encoder.finish().unwrap();
1159
1160 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1161 .unwrap()
1162 .0;
1163 assert_eq!(header.frame_content_size(), payload.len() as u64);
1164 assert!(
1165 !header.descriptor.single_segment_flag(),
1166 "single-segment must stay off when pledged content size exceeds matcher window"
1167 );
1168 assert!(
1169 header.window_size().unwrap() >= 1024,
1170 "window descriptor should be present when single-segment is disabled"
1171 );
1172 }
1173
1174 #[test]
1175 fn no_pledged_size_omits_fcs_from_header() {
1176 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1177 encoder.write_all(b"no pledged size").unwrap();
1178 let compressed = encoder.finish().unwrap();
1179
1180 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1182 .unwrap()
1183 .0;
1184 assert_eq!(header.frame_content_size(), 0);
1185 assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1188 }
1189}