1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14 CompressionLevel, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15 frame_compressor::CompressState, frame_compressor::FseTables, frame_header::FrameHeader,
16};
17use crate::io::{Error, ErrorKind, Write};
18
19pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
25 drain: Option<W>,
26 compression_level: CompressionLevel,
27 state: CompressState<M>,
28 pending: Vec<u8>,
29 errored: bool,
30 last_error_kind: Option<ErrorKind>,
31 last_error_message: Option<String>,
32 frame_started: bool,
33 pledged_content_size: Option<u64>,
34 bytes_consumed: u64,
35 #[cfg(feature = "hash")]
36 hasher: XxHash64,
37}
38
39impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
40 pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
45 Self::new_with_matcher(
46 MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
47 drain,
48 compression_level,
49 )
50 }
51}
52
53impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
54 pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
59 Self {
60 drain: Some(drain),
61 compression_level,
62 state: CompressState {
63 matcher,
64 last_huff_table: None,
65 fse_tables: FseTables::new(),
66 offset_hist: [1, 4, 8],
67 },
68 pending: Vec::new(),
69 errored: false,
70 last_error_kind: None,
71 last_error_message: None,
72 frame_started: false,
73 pledged_content_size: None,
74 bytes_consumed: 0,
75 #[cfg(feature = "hash")]
76 hasher: XxHash64::with_seed(0),
77 }
78 }
79
80 pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
91 self.ensure_open()?;
92 if self.frame_started {
93 return Err(invalid_input_error(
94 "pledged content size must be set before the first write",
95 ));
96 }
97 self.pledged_content_size = Some(size);
98 self.state.matcher.set_source_size_hint(size);
101 Ok(())
102 }
103
104 pub fn set_source_size_hint(&mut self, size: u64) -> Result<(), Error> {
112 self.ensure_open()?;
113 if self.frame_started {
114 return Err(invalid_input_error(
115 "source size hint must be set before the first write",
116 ));
117 }
118 self.state.matcher.set_source_size_hint(size);
119 Ok(())
120 }
121
122 pub fn get_ref(&self) -> &W {
127 self.drain
128 .as_ref()
129 .expect("streaming encoder drain is present until finish consumes self")
130 }
131
132 pub fn get_mut(&mut self) -> &mut W {
140 self.drain
141 .as_mut()
142 .expect("streaming encoder drain is present until finish consumes self")
143 }
144
145 pub fn finish(mut self) -> Result<W, Error> {
150 self.ensure_open()?;
151
152 if let Some(pledged) = self.pledged_content_size
156 && self.bytes_consumed != pledged
157 {
158 return Err(invalid_input_error(
159 "pledged content size does not match bytes consumed",
160 ));
161 }
162
163 self.ensure_frame_started()?;
164
165 if self.pending.is_empty() {
166 self.write_empty_last_block()
167 .map_err(|err| self.fail(err))?;
168 } else {
169 self.emit_pending_block(true)?;
170 }
171
172 let mut drain = self
173 .drain
174 .take()
175 .expect("streaming encoder drain must be present when finishing");
176
177 #[cfg(feature = "hash")]
178 {
179 let checksum = self.hasher.finish() as u32;
180 drain
181 .write_all(&checksum.to_le_bytes())
182 .map_err(|err| self.fail(err))?;
183 }
184
185 drain.flush().map_err(|err| self.fail(err))?;
186 Ok(drain)
187 }
188
189 fn ensure_open(&self) -> Result<(), Error> {
190 if self.errored {
191 return Err(self.sticky_error());
192 }
193 Ok(())
194 }
195
196 fn sticky_error(&self) -> Error {
200 match (self.last_error_kind, self.last_error_message.as_deref()) {
201 (Some(kind), Some(message)) => error_with_kind_message(
202 kind,
203 format!(
204 "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
205 ),
206 ),
207 (Some(kind), None) => error_from_kind(kind),
208 (None, Some(message)) => other_error_owned(format!(
209 "streaming encoder is in an errored state: {message}"
210 )),
211 (None, None) => other_error("streaming encoder is in an errored state"),
212 }
213 }
214
215 fn drain_mut(&mut self) -> Result<&mut W, Error> {
216 self.drain
217 .as_mut()
218 .ok_or_else(|| other_error("streaming encoder has no active drain"))
219 }
220
221 fn ensure_frame_started(&mut self) -> Result<(), Error> {
222 if self.frame_started {
223 return Ok(());
224 }
225
226 self.ensure_level_supported()?;
227 self.state.matcher.reset(self.compression_level);
228 self.state.offset_hist = [1, 4, 8];
229 self.state.last_huff_table = None;
230 self.state.fse_tables.ll_previous = None;
231 self.state.fse_tables.ml_previous = None;
232 self.state.fse_tables.of_previous = None;
233 #[cfg(feature = "hash")]
234 {
235 self.hasher = XxHash64::with_seed(0);
236 }
237
238 let window_size = self.state.matcher.window_size();
239 if window_size == 0 {
240 return Err(invalid_input_error(
241 "matcher reported window_size == 0, which is invalid",
242 ));
243 }
244
245 let header = FrameHeader {
246 frame_content_size: self.pledged_content_size,
247 single_segment: false,
248 content_checksum: cfg!(feature = "hash"),
249 dictionary_id: None,
250 window_size: Some(window_size),
251 };
252 let mut encoded_header = Vec::new();
253 header.serialize(&mut encoded_header);
254 self.drain_mut()
255 .and_then(|drain| drain.write_all(&encoded_header))
256 .map_err(|err| self.fail(err))?;
257
258 self.frame_started = true;
259 Ok(())
260 }
261
262 fn block_capacity(&self) -> usize {
263 let matcher_window = self.state.matcher.window_size() as usize;
264 core::cmp::max(1, core::cmp::min(matcher_window, MAX_BLOCK_SIZE as usize))
265 }
266
267 fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
268 let mut space = match self.compression_level {
269 CompressionLevel::Fastest
270 | CompressionLevel::Default
271 | CompressionLevel::Better
272 | CompressionLevel::Best
273 | CompressionLevel::Level(_) => self.state.matcher.get_next_space(),
274 CompressionLevel::Uncompressed => Vec::new(),
275 };
276 space.clear();
277 if space.capacity() > block_capacity {
278 space.shrink_to(block_capacity);
279 }
280 if space.capacity() < block_capacity {
281 space.reserve(block_capacity - space.capacity());
282 }
283 space
284 }
285
286 fn emit_full_pending_block(
287 &mut self,
288 block_capacity: usize,
289 consumed: usize,
290 ) -> Option<Result<usize, Error>> {
291 if self.pending.len() != block_capacity {
292 return None;
293 }
294
295 let new_pending = self.allocate_pending_space(block_capacity);
296 let full_block = mem::replace(&mut self.pending, new_pending);
297 if let Err((err, restored_block)) = self.encode_block(full_block, false) {
298 self.pending = restored_block;
299 let err = self.fail(err);
300 if consumed > 0 {
301 return Some(Ok(consumed));
302 }
303 return Some(Err(err));
304 }
305 None
306 }
307
308 fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
309 let block = mem::take(&mut self.pending);
310 if let Err((err, restored_block)) = self.encode_block(block, last_block) {
311 self.pending = restored_block;
312 return Err(self.fail(err));
313 }
314 if !last_block {
315 let block_capacity = self.block_capacity();
316 self.pending = self.allocate_pending_space(block_capacity);
317 }
318 Ok(())
319 }
320
321 fn ensure_level_supported(&self) -> Result<(), Error> {
325 match self.compression_level {
326 CompressionLevel::Uncompressed
327 | CompressionLevel::Fastest
328 | CompressionLevel::Default
329 | CompressionLevel::Better
330 | CompressionLevel::Best
331 | CompressionLevel::Level(_) => Ok(()),
332 }
333 }
334
335 fn encode_block(
336 &mut self,
337 uncompressed_data: Vec<u8>,
338 last_block: bool,
339 ) -> Result<(), (Error, Vec<u8>)> {
340 let mut raw_block = Some(uncompressed_data);
341 let mut encoded = Vec::with_capacity(self.block_capacity() + 3);
343 let mut moved_into_matcher = false;
344 if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
345 let header = BlockHeader {
346 last_block,
347 block_type: crate::blocks::block::BlockType::Raw,
348 block_size: 0,
349 };
350 header.serialize(&mut encoded);
351 } else {
352 match self.compression_level {
353 CompressionLevel::Uncompressed => {
354 let block = raw_block.as_ref().expect("raw block missing");
355 let header = BlockHeader {
356 last_block,
357 block_type: crate::blocks::block::BlockType::Raw,
358 block_size: block.len() as u32,
359 };
360 header.serialize(&mut encoded);
361 encoded.extend_from_slice(block);
362 }
363 CompressionLevel::Fastest
364 | CompressionLevel::Default
365 | CompressionLevel::Better
366 | CompressionLevel::Best
367 | CompressionLevel::Level(_) => {
368 let block = raw_block.take().expect("raw block missing");
369 debug_assert!(!block.is_empty(), "empty blocks handled above");
370 compress_block_encoded(&mut self.state, last_block, block, &mut encoded);
371 moved_into_matcher = true;
372 }
373 }
374 }
375
376 if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
377 let restored = if moved_into_matcher {
378 self.state.matcher.get_last_space().to_vec()
379 } else {
380 raw_block.unwrap_or_default()
381 };
382 return Err((err, restored));
383 }
384
385 if moved_into_matcher {
386 #[cfg(feature = "hash")]
387 {
388 self.hasher.write(self.state.matcher.get_last_space());
389 }
390 } else {
391 self.hash_block(raw_block.as_deref().unwrap_or(&[]));
392 }
393 Ok(())
394 }
395
396 fn write_empty_last_block(&mut self) -> Result<(), Error> {
397 self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
398 }
399
400 fn fail(&mut self, err: Error) -> Error {
401 self.errored = true;
402 if self.last_error_kind.is_none() {
403 self.last_error_kind = Some(err.kind());
404 }
405 if self.last_error_message.is_none() {
406 self.last_error_message = Some(err.to_string());
407 }
408 err
409 }
410
411 #[cfg(feature = "hash")]
412 fn hash_block(&mut self, uncompressed_data: &[u8]) {
413 self.hasher.write(uncompressed_data);
414 }
415
416 #[cfg(not(feature = "hash"))]
417 fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
418}
419
420impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
421 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
422 self.ensure_open()?;
423 if buf.is_empty() {
424 return Ok(0);
425 }
426
427 if let Some(pledged) = self.pledged_content_size
431 && self.bytes_consumed >= pledged
432 {
433 return Err(invalid_input_error(
434 "write would exceed pledged content size",
435 ));
436 }
437
438 self.ensure_frame_started()?;
439
440 let buf = if let Some(pledged) = self.pledged_content_size {
444 let remaining_allowed = pledged
445 .checked_sub(self.bytes_consumed)
446 .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
447 if remaining_allowed == 0 {
448 return Err(invalid_input_error(
449 "write would exceed pledged content size",
450 ));
451 }
452 let accepted = core::cmp::min(
453 buf.len(),
454 usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
455 );
456 &buf[..accepted]
457 } else {
458 buf
459 };
460
461 let block_capacity = self.block_capacity();
462 if self.pending.capacity() == 0 {
463 self.pending = self.allocate_pending_space(block_capacity);
464 }
465 let mut remaining = buf;
466 let mut consumed = 0usize;
467
468 while !remaining.is_empty() {
469 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
470 return result;
471 }
472
473 let available = block_capacity - self.pending.len();
474 let to_take = core::cmp::min(remaining.len(), available);
475 if to_take == 0 {
476 break;
477 }
478 self.pending.extend_from_slice(&remaining[..to_take]);
479 remaining = &remaining[to_take..];
480 consumed += to_take;
481
482 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
483 if let Ok(n) = &result {
484 self.bytes_consumed += *n as u64;
485 }
486 return result;
487 }
488 }
489 self.bytes_consumed += consumed as u64;
490 Ok(consumed)
491 }
492
493 fn flush(&mut self) -> Result<(), Error> {
494 self.ensure_open()?;
495 if self.pending.is_empty() {
496 return self
497 .drain_mut()
498 .and_then(|drain| drain.flush())
499 .map_err(|err| self.fail(err));
500 }
501 self.ensure_frame_started()?;
502 self.emit_pending_block(false)?;
503 self.drain_mut()
504 .and_then(|drain| drain.flush())
505 .map_err(|err| self.fail(err))
506 }
507}
508
509fn error_from_kind(kind: ErrorKind) -> Error {
510 Error::from(kind)
511}
512
513fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
514 #[cfg(feature = "std")]
515 {
516 Error::new(kind, message)
517 }
518 #[cfg(not(feature = "std"))]
519 {
520 Error::new(kind, alloc::boxed::Box::new(message))
521 }
522}
523
524fn invalid_input_error(message: &str) -> Error {
525 #[cfg(feature = "std")]
526 {
527 Error::new(ErrorKind::InvalidInput, message)
528 }
529 #[cfg(not(feature = "std"))]
530 {
531 Error::new(
532 ErrorKind::Other,
533 alloc::boxed::Box::new(alloc::string::String::from(message)),
534 )
535 }
536}
537
538fn other_error_owned(message: String) -> Error {
539 #[cfg(feature = "std")]
540 {
541 Error::other(message)
542 }
543 #[cfg(not(feature = "std"))]
544 {
545 Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
546 }
547}
548
549fn other_error(message: &str) -> Error {
550 #[cfg(feature = "std")]
551 {
552 Error::other(message)
553 }
554 #[cfg(not(feature = "std"))]
555 {
556 Error::new(
557 ErrorKind::Other,
558 alloc::boxed::Box::new(alloc::string::String::from(message)),
559 )
560 }
561}
562
563#[cfg(test)]
564mod tests {
565 use crate::decoding::StreamingDecoder;
566 use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
567 use crate::io::{Error, ErrorKind, Read, Write};
568 use alloc::vec;
569 use alloc::vec::Vec;
570
571 struct TinyMatcher {
572 last_space: Vec<u8>,
573 window_size: u64,
574 }
575
576 impl TinyMatcher {
577 fn new(window_size: u64) -> Self {
578 Self {
579 last_space: Vec::new(),
580 window_size,
581 }
582 }
583 }
584
585 impl Matcher for TinyMatcher {
586 fn get_next_space(&mut self) -> Vec<u8> {
587 vec![0; self.window_size as usize]
588 }
589
590 fn get_last_space(&mut self) -> &[u8] {
591 self.last_space.as_slice()
592 }
593
594 fn commit_space(&mut self, space: Vec<u8>) {
595 self.last_space = space;
596 }
597
598 fn skip_matching(&mut self) {}
599
600 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
601 handle_sequence(Sequence::Literals {
602 literals: self.last_space.as_slice(),
603 });
604 }
605
606 fn reset(&mut self, _level: CompressionLevel) {
607 self.last_space.clear();
608 }
609
610 fn window_size(&self) -> u64 {
611 self.window_size
612 }
613 }
614
615 struct FailingWriteOnce {
616 writes: usize,
617 fail_on_write_number: usize,
618 sink: Vec<u8>,
619 }
620
621 impl FailingWriteOnce {
622 fn new(fail_on_write_number: usize) -> Self {
623 Self {
624 writes: 0,
625 fail_on_write_number,
626 sink: Vec::new(),
627 }
628 }
629 }
630
631 impl Write for FailingWriteOnce {
632 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
633 self.writes += 1;
634 if self.writes == self.fail_on_write_number {
635 return Err(super::other_error("injected write failure"));
636 }
637 self.sink.extend_from_slice(buf);
638 Ok(buf.len())
639 }
640
641 fn flush(&mut self) -> Result<(), Error> {
642 Ok(())
643 }
644 }
645
646 struct FailingWithKind {
647 writes: usize,
648 fail_on_write_number: usize,
649 kind: ErrorKind,
650 }
651
652 impl FailingWithKind {
653 fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
654 Self {
655 writes: 0,
656 fail_on_write_number,
657 kind,
658 }
659 }
660 }
661
662 impl Write for FailingWithKind {
663 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
664 self.writes += 1;
665 if self.writes == self.fail_on_write_number {
666 return Err(Error::from(self.kind));
667 }
668 Ok(buf.len())
669 }
670
671 fn flush(&mut self) -> Result<(), Error> {
672 Ok(())
673 }
674 }
675
676 struct PartialThenFailWriter {
677 writes: usize,
678 fail_on_write_number: usize,
679 partial_prefix_len: usize,
680 terminal_failure: bool,
681 sink: Vec<u8>,
682 }
683
684 impl PartialThenFailWriter {
685 fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
686 Self {
687 writes: 0,
688 fail_on_write_number,
689 partial_prefix_len,
690 terminal_failure: false,
691 sink: Vec::new(),
692 }
693 }
694 }
695
696 impl Write for PartialThenFailWriter {
697 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
698 if self.terminal_failure {
699 return Err(super::other_error("injected terminal write failure"));
700 }
701
702 self.writes += 1;
703 if self.writes == self.fail_on_write_number {
704 let written = core::cmp::min(self.partial_prefix_len, buf.len());
705 if written > 0 {
706 self.sink.extend_from_slice(&buf[..written]);
707 self.terminal_failure = true;
708 return Ok(written);
709 }
710 return Err(super::other_error("injected terminal write failure"));
711 }
712
713 self.sink.extend_from_slice(buf);
714 Ok(buf.len())
715 }
716
717 fn flush(&mut self) -> Result<(), Error> {
718 Ok(())
719 }
720 }
721
722 #[test]
723 fn streaming_encoder_roundtrip_multiple_writes() {
724 let payload = b"streaming-encoder-roundtrip-".repeat(1024);
725 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
726 for chunk in payload.chunks(313) {
727 encoder.write_all(chunk).unwrap();
728 }
729 let compressed = encoder.finish().unwrap();
730
731 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
732 let mut decoded = Vec::new();
733 decoder.read_to_end(&mut decoded).unwrap();
734 assert_eq!(decoded, payload);
735 }
736
737 #[test]
738 fn flush_emits_nonempty_partial_output() {
739 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
740 encoder.write_all(b"partial-block").unwrap();
741 encoder.flush().unwrap();
742 let flushed_len = encoder.get_ref().len();
743 assert!(
744 flushed_len > 0,
745 "flush should emit header+partial block bytes"
746 );
747 let compressed = encoder.finish().unwrap();
748 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
749 let mut decoded = Vec::new();
750 decoder.read_to_end(&mut decoded).unwrap();
751 assert_eq!(decoded, b"partial-block");
752 }
753
754 #[test]
755 fn flush_without_writes_does_not_emit_frame_header() {
756 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
757 encoder.flush().unwrap();
758 assert!(encoder.get_ref().is_empty());
759 }
760
761 #[test]
762 fn block_boundary_write_emits_block_in_same_call() {
763 let mut boundary = StreamingEncoder::new_with_matcher(
764 TinyMatcher::new(4),
765 Vec::new(),
766 CompressionLevel::Uncompressed,
767 );
768 let mut below = StreamingEncoder::new_with_matcher(
769 TinyMatcher::new(4),
770 Vec::new(),
771 CompressionLevel::Uncompressed,
772 );
773
774 boundary.write_all(b"ABCD").unwrap();
775 below.write_all(b"ABC").unwrap();
776
777 let boundary_len = boundary.get_ref().len();
778 let below_len = below.get_ref().len();
779 assert!(
780 boundary_len > below_len,
781 "full block should be emitted immediately at block boundary"
782 );
783 }
784
785 #[test]
786 fn finish_consumes_encoder_and_emits_frame() {
787 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
788 encoder.write_all(b"abc").unwrap();
789 let compressed = encoder.finish().unwrap();
790 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
791 let mut decoded = Vec::new();
792 decoder.read_to_end(&mut decoded).unwrap();
793 assert_eq!(decoded, b"abc");
794 }
795
796 #[test]
797 fn finish_without_writes_emits_empty_frame() {
798 let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
799 let compressed = encoder.finish().unwrap();
800 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
801 let mut decoded = Vec::new();
802 decoder.read_to_end(&mut decoded).unwrap();
803 assert!(decoded.is_empty());
804 }
805
806 #[test]
807 fn write_empty_buffer_returns_zero() {
808 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
809 assert_eq!(encoder.write(&[]).unwrap(), 0);
810 let _ = encoder.finish().unwrap();
811 }
812
813 #[test]
814 fn uncompressed_level_roundtrip() {
815 let payload = b"uncompressed-streaming-roundtrip".repeat(64);
816 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
817 for chunk in payload.chunks(41) {
818 encoder.write_all(chunk).unwrap();
819 }
820 let compressed = encoder.finish().unwrap();
821 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
822 let mut decoded = Vec::new();
823 decoder.read_to_end(&mut decoded).unwrap();
824 assert_eq!(decoded, payload);
825 }
826
827 #[test]
828 fn better_level_streaming_roundtrip() {
829 let payload = b"better-level-streaming-test".repeat(256);
830 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
831 for chunk in payload.chunks(53) {
832 encoder.write_all(chunk).unwrap();
833 }
834 let compressed = encoder.finish().unwrap();
835 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
836 let mut decoded = Vec::new();
837 decoder.read_to_end(&mut decoded).unwrap();
838 assert_eq!(decoded, payload);
839 }
840
841 #[test]
842 fn zero_window_matcher_returns_invalid_input_error() {
843 let mut encoder = StreamingEncoder::new_with_matcher(
844 TinyMatcher::new(0),
845 Vec::new(),
846 CompressionLevel::Fastest,
847 );
848 let err = encoder.write_all(b"payload").unwrap_err();
849 assert_eq!(err.kind(), ErrorKind::InvalidInput);
850 }
851
852 #[test]
853 fn best_level_streaming_roundtrip() {
854 let payload = b"best-level-streaming-test".repeat(8 * 1024);
857 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
858 for chunk in payload.chunks(53) {
859 encoder.write_all(chunk).unwrap();
860 }
861 let compressed = encoder.finish().unwrap();
862 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
863 let mut decoded = Vec::new();
864 decoder.read_to_end(&mut decoded).unwrap();
865 assert_eq!(decoded, payload);
866 }
867
868 #[test]
869 fn write_failure_poisoning_is_sticky() {
870 let mut encoder = StreamingEncoder::new_with_matcher(
871 TinyMatcher::new(4),
872 FailingWriteOnce::new(1),
873 CompressionLevel::Uncompressed,
874 );
875
876 assert!(encoder.write_all(b"ABCD").is_err());
877 assert!(encoder.flush().is_err());
878 assert!(encoder.write_all(b"EFGH").is_err());
879 assert_eq!(encoder.get_ref().sink.len(), 0);
880 assert!(encoder.finish().is_err());
881 }
882
883 #[test]
884 fn poisoned_encoder_returns_original_error_kind() {
885 let mut encoder = StreamingEncoder::new_with_matcher(
886 TinyMatcher::new(4),
887 FailingWithKind::new(1, ErrorKind::BrokenPipe),
888 CompressionLevel::Uncompressed,
889 );
890
891 let first_error = encoder.write_all(b"ABCD").unwrap_err();
892 assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
893
894 let second_error = encoder.write_all(b"EFGH").unwrap_err();
895 assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
896 }
897
898 #[test]
899 fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
900 let payload = b"ABCDEFGHIJKL";
901 let mut encoder = StreamingEncoder::new_with_matcher(
902 TinyMatcher::new(4),
903 FailingWriteOnce::new(3),
904 CompressionLevel::Uncompressed,
905 );
906
907 let first_write = encoder.write(payload).unwrap();
908 assert_eq!(first_write, 8);
909 assert!(encoder.write(&payload[first_write..]).is_err());
910 assert!(encoder.flush().is_err());
911 assert!(encoder.write_all(b"EFGH").is_err());
912 }
913
914 #[test]
915 fn partial_write_failure_after_progress_poisons_encoder() {
916 let payload = b"ABCDEFGHIJKL";
917 let mut encoder = StreamingEncoder::new_with_matcher(
918 TinyMatcher::new(4),
919 PartialThenFailWriter::new(3, 1),
920 CompressionLevel::Uncompressed,
921 );
922
923 let first_write = encoder.write(payload).unwrap();
924 assert_eq!(first_write, 8);
925
926 let second_write = encoder.write(&payload[first_write..]);
927 assert!(second_write.is_err());
928 assert!(encoder.flush().is_err());
929 assert!(encoder.write_all(b"MNOP").is_err());
930 }
931
932 #[test]
933 fn new_with_matcher_and_get_mut_work() {
934 let matcher = TinyMatcher::new(128 * 1024);
935 let mut encoder =
936 StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
937 encoder.get_mut().extend_from_slice(b"");
938 encoder.write_all(b"custom-matcher").unwrap();
939 let compressed = encoder.finish().unwrap();
940 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
941 let mut decoded = Vec::new();
942 decoder.read_to_end(&mut decoded).unwrap();
943 assert_eq!(decoded, b"custom-matcher");
944 }
945
946 #[cfg(feature = "std")]
947 #[test]
948 fn streaming_encoder_output_decompresses_with_c_zstd() {
949 let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
950 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
951 for chunk in payload.chunks(1024) {
952 encoder.write_all(chunk).unwrap();
953 }
954 let compressed = encoder.finish().unwrap();
955
956 let mut decoded = Vec::with_capacity(payload.len());
957 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
958 assert_eq!(decoded, payload);
959 }
960
961 #[test]
962 fn pledged_content_size_written_in_header() {
963 let payload = b"hello world, pledged size test";
964 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
965 encoder
966 .set_pledged_content_size(payload.len() as u64)
967 .unwrap();
968 encoder.write_all(payload).unwrap();
969 let compressed = encoder.finish().unwrap();
970
971 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
973 .unwrap()
974 .0;
975 assert_eq!(header.frame_content_size(), payload.len() as u64);
976
977 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
979 let mut decoded = Vec::new();
980 decoder.read_to_end(&mut decoded).unwrap();
981 assert_eq!(decoded, payload);
982 }
983
984 #[test]
985 fn pledged_content_size_mismatch_returns_error() {
986 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
987 encoder.set_pledged_content_size(100).unwrap();
988 encoder.write_all(b"short payload").unwrap(); let err = encoder.finish().unwrap_err();
990 assert_eq!(err.kind(), ErrorKind::InvalidInput);
991 }
992
993 #[test]
994 fn write_exceeding_pledge_returns_error() {
995 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
996 encoder.set_pledged_content_size(5).unwrap();
997 let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
998 assert_eq!(err.kind(), ErrorKind::InvalidInput);
999 }
1000
1001 #[test]
1002 fn write_straddling_pledge_reports_partial_progress() {
1003 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1004 encoder.set_pledged_content_size(5).unwrap();
1005 assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
1007 let err = encoder.write(b"g").unwrap_err();
1009 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1010 }
1011
1012 #[test]
1013 fn pledged_content_size_after_write_returns_error() {
1014 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1015 encoder.write_all(b"already writing").unwrap();
1016 let err = encoder.set_pledged_content_size(15).unwrap_err();
1017 assert_eq!(err.kind(), ErrorKind::InvalidInput);
1018 }
1019
1020 #[test]
1021 fn source_size_hint_directly_reduces_window_header() {
1022 let payload = b"streaming-source-size-hint".repeat(64);
1023
1024 let mut no_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1025 no_hint.write_all(payload.as_slice()).unwrap();
1026 let no_hint_frame = no_hint.finish().unwrap();
1027 let no_hint_header = crate::decoding::frame::read_frame_header(no_hint_frame.as_slice())
1028 .unwrap()
1029 .0;
1030 let no_hint_window = no_hint_header.window_size().unwrap();
1031
1032 let mut with_hint = StreamingEncoder::new(Vec::new(), CompressionLevel::from_level(11));
1033 with_hint
1034 .set_source_size_hint(payload.len() as u64)
1035 .unwrap();
1036 with_hint.write_all(payload.as_slice()).unwrap();
1037 let late_hint_err = with_hint
1038 .set_source_size_hint(payload.len() as u64)
1039 .unwrap_err();
1040 assert_eq!(late_hint_err.kind(), ErrorKind::InvalidInput);
1041 let with_hint_frame = with_hint.finish().unwrap();
1042 let with_hint_header =
1043 crate::decoding::frame::read_frame_header(with_hint_frame.as_slice())
1044 .unwrap()
1045 .0;
1046 let with_hint_window = with_hint_header.window_size().unwrap();
1047
1048 assert!(
1049 with_hint_window <= no_hint_window,
1050 "source size hint should not increase advertised window"
1051 );
1052
1053 let mut decoder = StreamingDecoder::new(with_hint_frame.as_slice()).unwrap();
1054 let mut decoded = Vec::new();
1055 decoder.read_to_end(&mut decoded).unwrap();
1056 assert_eq!(decoded, payload);
1057 }
1058
1059 #[cfg(feature = "std")]
1060 #[test]
1061 fn pledged_content_size_c_zstd_compatible() {
1062 let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
1063 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1064 encoder
1065 .set_pledged_content_size(payload.len() as u64)
1066 .unwrap();
1067 for chunk in payload.chunks(1024) {
1068 encoder.write_all(chunk).unwrap();
1069 }
1070 let compressed = encoder.finish().unwrap();
1071
1072 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1074 .unwrap()
1075 .0;
1076 assert_eq!(header.frame_content_size(), payload.len() as u64);
1077
1078 let mut decoded = Vec::new();
1080 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1081 assert_eq!(decoded, payload);
1082 }
1083
1084 #[test]
1085 fn no_pledged_size_omits_fcs_from_header() {
1086 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1087 encoder.write_all(b"no pledged size").unwrap();
1088 let compressed = encoder.finish().unwrap();
1089
1090 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1092 .unwrap()
1093 .0;
1094 assert_eq!(header.frame_content_size(), 0);
1095 assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1098 }
1099}