1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14 CompressionLevel, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15 frame_compressor::CompressState, frame_compressor::FseTables, frame_header::FrameHeader,
16};
17use crate::io::{Error, ErrorKind, Write};
18
19pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
25 drain: Option<W>,
26 compression_level: CompressionLevel,
27 state: CompressState<M>,
28 pending: Vec<u8>,
29 errored: bool,
30 last_error_kind: Option<ErrorKind>,
31 last_error_message: Option<String>,
32 frame_started: bool,
33 pledged_content_size: Option<u64>,
34 bytes_consumed: u64,
35 #[cfg(feature = "hash")]
36 hasher: XxHash64,
37}
38
39impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
40 pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
45 Self::new_with_matcher(
46 MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
47 drain,
48 compression_level,
49 )
50 }
51}
52
53impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
54 pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
59 Self {
60 drain: Some(drain),
61 compression_level,
62 state: CompressState {
63 matcher,
64 last_huff_table: None,
65 fse_tables: FseTables::new(),
66 offset_hist: [1, 4, 8],
67 },
68 pending: Vec::new(),
69 errored: false,
70 last_error_kind: None,
71 last_error_message: None,
72 frame_started: false,
73 pledged_content_size: None,
74 bytes_consumed: 0,
75 #[cfg(feature = "hash")]
76 hasher: XxHash64::with_seed(0),
77 }
78 }
79
80 pub fn set_pledged_content_size(&mut self, size: u64) -> Result<(), Error> {
89 self.ensure_open()?;
90 if self.frame_started {
91 return Err(invalid_input_error(
92 "pledged content size must be set before the first write",
93 ));
94 }
95 self.pledged_content_size = Some(size);
96 Ok(())
97 }
98
99 pub fn get_ref(&self) -> &W {
104 self.drain
105 .as_ref()
106 .expect("streaming encoder drain is present until finish consumes self")
107 }
108
109 pub fn get_mut(&mut self) -> &mut W {
117 self.drain
118 .as_mut()
119 .expect("streaming encoder drain is present until finish consumes self")
120 }
121
122 pub fn finish(mut self) -> Result<W, Error> {
127 self.ensure_open()?;
128
129 if let Some(pledged) = self.pledged_content_size
133 && self.bytes_consumed != pledged
134 {
135 return Err(invalid_input_error(
136 "pledged content size does not match bytes consumed",
137 ));
138 }
139
140 self.ensure_frame_started()?;
141
142 if self.pending.is_empty() {
143 self.write_empty_last_block()
144 .map_err(|err| self.fail(err))?;
145 } else {
146 self.emit_pending_block(true)?;
147 }
148
149 let mut drain = self
150 .drain
151 .take()
152 .expect("streaming encoder drain must be present when finishing");
153
154 #[cfg(feature = "hash")]
155 {
156 let checksum = self.hasher.finish() as u32;
157 drain
158 .write_all(&checksum.to_le_bytes())
159 .map_err(|err| self.fail(err))?;
160 }
161
162 drain.flush().map_err(|err| self.fail(err))?;
163 Ok(drain)
164 }
165
166 fn ensure_open(&self) -> Result<(), Error> {
167 if self.errored {
168 return Err(self.sticky_error());
169 }
170 Ok(())
171 }
172
173 fn sticky_error(&self) -> Error {
177 match (self.last_error_kind, self.last_error_message.as_deref()) {
178 (Some(kind), Some(message)) => error_with_kind_message(
179 kind,
180 format!(
181 "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
182 ),
183 ),
184 (Some(kind), None) => error_from_kind(kind),
185 (None, Some(message)) => other_error_owned(format!(
186 "streaming encoder is in an errored state: {message}"
187 )),
188 (None, None) => other_error("streaming encoder is in an errored state"),
189 }
190 }
191
192 fn drain_mut(&mut self) -> Result<&mut W, Error> {
193 self.drain
194 .as_mut()
195 .ok_or_else(|| other_error("streaming encoder has no active drain"))
196 }
197
198 fn ensure_frame_started(&mut self) -> Result<(), Error> {
199 if self.frame_started {
200 return Ok(());
201 }
202
203 self.ensure_level_supported()?;
204 self.state.matcher.reset(self.compression_level);
205 self.state.offset_hist = [1, 4, 8];
206 self.state.last_huff_table = None;
207 self.state.fse_tables.ll_previous = None;
208 self.state.fse_tables.ml_previous = None;
209 self.state.fse_tables.of_previous = None;
210 #[cfg(feature = "hash")]
211 {
212 self.hasher = XxHash64::with_seed(0);
213 }
214
215 let window_size = self.state.matcher.window_size();
216 if window_size == 0 {
217 return Err(invalid_input_error(
218 "matcher reported window_size == 0, which is invalid",
219 ));
220 }
221
222 let header = FrameHeader {
223 frame_content_size: self.pledged_content_size,
224 single_segment: false,
225 content_checksum: cfg!(feature = "hash"),
226 dictionary_id: None,
227 window_size: Some(window_size),
228 };
229 let mut encoded_header = Vec::new();
230 header.serialize(&mut encoded_header);
231 self.drain_mut()
232 .and_then(|drain| drain.write_all(&encoded_header))
233 .map_err(|err| self.fail(err))?;
234
235 self.frame_started = true;
236 Ok(())
237 }
238
239 fn block_capacity(&self) -> usize {
240 let matcher_window = self.state.matcher.window_size() as usize;
241 core::cmp::max(1, core::cmp::min(matcher_window, MAX_BLOCK_SIZE as usize))
242 }
243
244 fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
245 let mut space = match self.compression_level {
246 CompressionLevel::Fastest
247 | CompressionLevel::Default
248 | CompressionLevel::Better
249 | CompressionLevel::Best => self.state.matcher.get_next_space(),
250 _ => Vec::new(),
251 };
252 space.clear();
253 if space.capacity() > block_capacity {
254 space.shrink_to(block_capacity);
255 }
256 if space.capacity() < block_capacity {
257 space.reserve(block_capacity - space.capacity());
258 }
259 space
260 }
261
262 fn emit_full_pending_block(
263 &mut self,
264 block_capacity: usize,
265 consumed: usize,
266 ) -> Option<Result<usize, Error>> {
267 if self.pending.len() != block_capacity {
268 return None;
269 }
270
271 let new_pending = self.allocate_pending_space(block_capacity);
272 let full_block = mem::replace(&mut self.pending, new_pending);
273 if let Err((err, restored_block)) = self.encode_block(full_block, false) {
274 self.pending = restored_block;
275 let err = self.fail(err);
276 if consumed > 0 {
277 return Some(Ok(consumed));
278 }
279 return Some(Err(err));
280 }
281 None
282 }
283
284 fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
285 let block = mem::take(&mut self.pending);
286 if let Err((err, restored_block)) = self.encode_block(block, last_block) {
287 self.pending = restored_block;
288 return Err(self.fail(err));
289 }
290 if !last_block {
291 let block_capacity = self.block_capacity();
292 self.pending = self.allocate_pending_space(block_capacity);
293 }
294 Ok(())
295 }
296
297 fn ensure_level_supported(&self) -> Result<(), Error> {
301 match self.compression_level {
302 CompressionLevel::Uncompressed
303 | CompressionLevel::Fastest
304 | CompressionLevel::Default
305 | CompressionLevel::Better
306 | CompressionLevel::Best => Ok(()),
307 }
308 }
309
310 fn encode_block(
311 &mut self,
312 uncompressed_data: Vec<u8>,
313 last_block: bool,
314 ) -> Result<(), (Error, Vec<u8>)> {
315 let mut raw_block = Some(uncompressed_data);
316 let mut encoded = Vec::with_capacity(self.block_capacity() + 3);
318 let mut moved_into_matcher = false;
319 if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
320 let header = BlockHeader {
321 last_block,
322 block_type: crate::blocks::block::BlockType::Raw,
323 block_size: 0,
324 };
325 header.serialize(&mut encoded);
326 } else {
327 match self.compression_level {
328 CompressionLevel::Uncompressed => {
329 let block = raw_block.as_ref().expect("raw block missing");
330 let header = BlockHeader {
331 last_block,
332 block_type: crate::blocks::block::BlockType::Raw,
333 block_size: block.len() as u32,
334 };
335 header.serialize(&mut encoded);
336 encoded.extend_from_slice(block);
337 }
338 CompressionLevel::Fastest
339 | CompressionLevel::Default
340 | CompressionLevel::Better
341 | CompressionLevel::Best => {
342 let block = raw_block.take().expect("raw block missing");
343 debug_assert!(!block.is_empty(), "empty blocks handled above");
344 compress_block_encoded(&mut self.state, last_block, block, &mut encoded);
345 moved_into_matcher = true;
346 }
347 }
348 }
349
350 if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
351 let restored = if moved_into_matcher {
352 self.state.matcher.get_last_space().to_vec()
353 } else {
354 raw_block.unwrap_or_default()
355 };
356 return Err((err, restored));
357 }
358
359 if moved_into_matcher {
360 #[cfg(feature = "hash")]
361 {
362 self.hasher.write(self.state.matcher.get_last_space());
363 }
364 } else {
365 self.hash_block(raw_block.as_deref().unwrap_or(&[]));
366 }
367 Ok(())
368 }
369
370 fn write_empty_last_block(&mut self) -> Result<(), Error> {
371 self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
372 }
373
374 fn fail(&mut self, err: Error) -> Error {
375 self.errored = true;
376 if self.last_error_kind.is_none() {
377 self.last_error_kind = Some(err.kind());
378 }
379 if self.last_error_message.is_none() {
380 self.last_error_message = Some(err.to_string());
381 }
382 err
383 }
384
385 #[cfg(feature = "hash")]
386 fn hash_block(&mut self, uncompressed_data: &[u8]) {
387 self.hasher.write(uncompressed_data);
388 }
389
390 #[cfg(not(feature = "hash"))]
391 fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
392}
393
394impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
395 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
396 self.ensure_open()?;
397 if buf.is_empty() {
398 return Ok(0);
399 }
400
401 if let Some(pledged) = self.pledged_content_size
405 && self.bytes_consumed >= pledged
406 {
407 return Err(invalid_input_error(
408 "write would exceed pledged content size",
409 ));
410 }
411
412 self.ensure_frame_started()?;
413
414 let buf = if let Some(pledged) = self.pledged_content_size {
418 let remaining_allowed = pledged
419 .checked_sub(self.bytes_consumed)
420 .ok_or_else(|| invalid_input_error("bytes consumed exceed pledged content size"))?;
421 if remaining_allowed == 0 {
422 return Err(invalid_input_error(
423 "write would exceed pledged content size",
424 ));
425 }
426 let accepted = core::cmp::min(
427 buf.len(),
428 usize::try_from(remaining_allowed).unwrap_or(usize::MAX),
429 );
430 &buf[..accepted]
431 } else {
432 buf
433 };
434
435 let block_capacity = self.block_capacity();
436 if self.pending.capacity() == 0 {
437 self.pending = self.allocate_pending_space(block_capacity);
438 }
439 let mut remaining = buf;
440 let mut consumed = 0usize;
441
442 while !remaining.is_empty() {
443 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
444 return result;
445 }
446
447 let available = block_capacity - self.pending.len();
448 let to_take = core::cmp::min(remaining.len(), available);
449 if to_take == 0 {
450 break;
451 }
452 self.pending.extend_from_slice(&remaining[..to_take]);
453 remaining = &remaining[to_take..];
454 consumed += to_take;
455
456 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
457 if let Ok(n) = &result {
458 self.bytes_consumed += *n as u64;
459 }
460 return result;
461 }
462 }
463 self.bytes_consumed += consumed as u64;
464 Ok(consumed)
465 }
466
467 fn flush(&mut self) -> Result<(), Error> {
468 self.ensure_open()?;
469 if self.pending.is_empty() {
470 return self
471 .drain_mut()
472 .and_then(|drain| drain.flush())
473 .map_err(|err| self.fail(err));
474 }
475 self.ensure_frame_started()?;
476 self.emit_pending_block(false)?;
477 self.drain_mut()
478 .and_then(|drain| drain.flush())
479 .map_err(|err| self.fail(err))
480 }
481}
482
483fn error_from_kind(kind: ErrorKind) -> Error {
484 Error::from(kind)
485}
486
487fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
488 #[cfg(feature = "std")]
489 {
490 Error::new(kind, message)
491 }
492 #[cfg(not(feature = "std"))]
493 {
494 Error::new(kind, alloc::boxed::Box::new(message))
495 }
496}
497
498fn invalid_input_error(message: &str) -> Error {
499 #[cfg(feature = "std")]
500 {
501 Error::new(ErrorKind::InvalidInput, message)
502 }
503 #[cfg(not(feature = "std"))]
504 {
505 Error::new(
506 ErrorKind::Other,
507 alloc::boxed::Box::new(alloc::string::String::from(message)),
508 )
509 }
510}
511
512fn other_error_owned(message: String) -> Error {
513 #[cfg(feature = "std")]
514 {
515 Error::other(message)
516 }
517 #[cfg(not(feature = "std"))]
518 {
519 Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
520 }
521}
522
523fn other_error(message: &str) -> Error {
524 #[cfg(feature = "std")]
525 {
526 Error::other(message)
527 }
528 #[cfg(not(feature = "std"))]
529 {
530 Error::new(
531 ErrorKind::Other,
532 alloc::boxed::Box::new(alloc::string::String::from(message)),
533 )
534 }
535}
536
537#[cfg(test)]
538mod tests {
539 use crate::decoding::StreamingDecoder;
540 use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
541 use crate::io::{Error, ErrorKind, Read, Write};
542 use alloc::vec;
543 use alloc::vec::Vec;
544
545 struct TinyMatcher {
546 last_space: Vec<u8>,
547 window_size: u64,
548 }
549
550 impl TinyMatcher {
551 fn new(window_size: u64) -> Self {
552 Self {
553 last_space: Vec::new(),
554 window_size,
555 }
556 }
557 }
558
559 impl Matcher for TinyMatcher {
560 fn get_next_space(&mut self) -> Vec<u8> {
561 vec![0; self.window_size as usize]
562 }
563
564 fn get_last_space(&mut self) -> &[u8] {
565 self.last_space.as_slice()
566 }
567
568 fn commit_space(&mut self, space: Vec<u8>) {
569 self.last_space = space;
570 }
571
572 fn skip_matching(&mut self) {}
573
574 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
575 handle_sequence(Sequence::Literals {
576 literals: self.last_space.as_slice(),
577 });
578 }
579
580 fn reset(&mut self, _level: CompressionLevel) {
581 self.last_space.clear();
582 }
583
584 fn window_size(&self) -> u64 {
585 self.window_size
586 }
587 }
588
589 struct FailingWriteOnce {
590 writes: usize,
591 fail_on_write_number: usize,
592 sink: Vec<u8>,
593 }
594
595 impl FailingWriteOnce {
596 fn new(fail_on_write_number: usize) -> Self {
597 Self {
598 writes: 0,
599 fail_on_write_number,
600 sink: Vec::new(),
601 }
602 }
603 }
604
605 impl Write for FailingWriteOnce {
606 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
607 self.writes += 1;
608 if self.writes == self.fail_on_write_number {
609 return Err(super::other_error("injected write failure"));
610 }
611 self.sink.extend_from_slice(buf);
612 Ok(buf.len())
613 }
614
615 fn flush(&mut self) -> Result<(), Error> {
616 Ok(())
617 }
618 }
619
620 struct FailingWithKind {
621 writes: usize,
622 fail_on_write_number: usize,
623 kind: ErrorKind,
624 }
625
626 impl FailingWithKind {
627 fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
628 Self {
629 writes: 0,
630 fail_on_write_number,
631 kind,
632 }
633 }
634 }
635
636 impl Write for FailingWithKind {
637 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
638 self.writes += 1;
639 if self.writes == self.fail_on_write_number {
640 return Err(Error::from(self.kind));
641 }
642 Ok(buf.len())
643 }
644
645 fn flush(&mut self) -> Result<(), Error> {
646 Ok(())
647 }
648 }
649
650 struct PartialThenFailWriter {
651 writes: usize,
652 fail_on_write_number: usize,
653 partial_prefix_len: usize,
654 terminal_failure: bool,
655 sink: Vec<u8>,
656 }
657
658 impl PartialThenFailWriter {
659 fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
660 Self {
661 writes: 0,
662 fail_on_write_number,
663 partial_prefix_len,
664 terminal_failure: false,
665 sink: Vec::new(),
666 }
667 }
668 }
669
670 impl Write for PartialThenFailWriter {
671 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
672 if self.terminal_failure {
673 return Err(super::other_error("injected terminal write failure"));
674 }
675
676 self.writes += 1;
677 if self.writes == self.fail_on_write_number {
678 let written = core::cmp::min(self.partial_prefix_len, buf.len());
679 if written > 0 {
680 self.sink.extend_from_slice(&buf[..written]);
681 self.terminal_failure = true;
682 return Ok(written);
683 }
684 return Err(super::other_error("injected terminal write failure"));
685 }
686
687 self.sink.extend_from_slice(buf);
688 Ok(buf.len())
689 }
690
691 fn flush(&mut self) -> Result<(), Error> {
692 Ok(())
693 }
694 }
695
696 #[test]
697 fn streaming_encoder_roundtrip_multiple_writes() {
698 let payload = b"streaming-encoder-roundtrip-".repeat(1024);
699 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
700 for chunk in payload.chunks(313) {
701 encoder.write_all(chunk).unwrap();
702 }
703 let compressed = encoder.finish().unwrap();
704
705 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
706 let mut decoded = Vec::new();
707 decoder.read_to_end(&mut decoded).unwrap();
708 assert_eq!(decoded, payload);
709 }
710
711 #[test]
712 fn flush_emits_nonempty_partial_output() {
713 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
714 encoder.write_all(b"partial-block").unwrap();
715 encoder.flush().unwrap();
716 let flushed_len = encoder.get_ref().len();
717 assert!(
718 flushed_len > 0,
719 "flush should emit header+partial block bytes"
720 );
721 let compressed = encoder.finish().unwrap();
722 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
723 let mut decoded = Vec::new();
724 decoder.read_to_end(&mut decoded).unwrap();
725 assert_eq!(decoded, b"partial-block");
726 }
727
728 #[test]
729 fn flush_without_writes_does_not_emit_frame_header() {
730 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
731 encoder.flush().unwrap();
732 assert!(encoder.get_ref().is_empty());
733 }
734
735 #[test]
736 fn block_boundary_write_emits_block_in_same_call() {
737 let mut boundary = StreamingEncoder::new_with_matcher(
738 TinyMatcher::new(4),
739 Vec::new(),
740 CompressionLevel::Uncompressed,
741 );
742 let mut below = StreamingEncoder::new_with_matcher(
743 TinyMatcher::new(4),
744 Vec::new(),
745 CompressionLevel::Uncompressed,
746 );
747
748 boundary.write_all(b"ABCD").unwrap();
749 below.write_all(b"ABC").unwrap();
750
751 let boundary_len = boundary.get_ref().len();
752 let below_len = below.get_ref().len();
753 assert!(
754 boundary_len > below_len,
755 "full block should be emitted immediately at block boundary"
756 );
757 }
758
759 #[test]
760 fn finish_consumes_encoder_and_emits_frame() {
761 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
762 encoder.write_all(b"abc").unwrap();
763 let compressed = encoder.finish().unwrap();
764 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
765 let mut decoded = Vec::new();
766 decoder.read_to_end(&mut decoded).unwrap();
767 assert_eq!(decoded, b"abc");
768 }
769
770 #[test]
771 fn finish_without_writes_emits_empty_frame() {
772 let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
773 let compressed = encoder.finish().unwrap();
774 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
775 let mut decoded = Vec::new();
776 decoder.read_to_end(&mut decoded).unwrap();
777 assert!(decoded.is_empty());
778 }
779
780 #[test]
781 fn write_empty_buffer_returns_zero() {
782 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
783 assert_eq!(encoder.write(&[]).unwrap(), 0);
784 let _ = encoder.finish().unwrap();
785 }
786
787 #[test]
788 fn uncompressed_level_roundtrip() {
789 let payload = b"uncompressed-streaming-roundtrip".repeat(64);
790 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
791 for chunk in payload.chunks(41) {
792 encoder.write_all(chunk).unwrap();
793 }
794 let compressed = encoder.finish().unwrap();
795 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
796 let mut decoded = Vec::new();
797 decoder.read_to_end(&mut decoded).unwrap();
798 assert_eq!(decoded, payload);
799 }
800
801 #[test]
802 fn better_level_streaming_roundtrip() {
803 let payload = b"better-level-streaming-test".repeat(256);
804 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
805 for chunk in payload.chunks(53) {
806 encoder.write_all(chunk).unwrap();
807 }
808 let compressed = encoder.finish().unwrap();
809 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
810 let mut decoded = Vec::new();
811 decoder.read_to_end(&mut decoded).unwrap();
812 assert_eq!(decoded, payload);
813 }
814
815 #[test]
816 fn zero_window_matcher_returns_invalid_input_error() {
817 let mut encoder = StreamingEncoder::new_with_matcher(
818 TinyMatcher::new(0),
819 Vec::new(),
820 CompressionLevel::Fastest,
821 );
822 let err = encoder.write_all(b"payload").unwrap_err();
823 assert_eq!(err.kind(), ErrorKind::InvalidInput);
824 }
825
826 #[test]
827 fn best_level_streaming_roundtrip() {
828 let payload = b"best-level-streaming-test".repeat(8 * 1024);
831 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
832 for chunk in payload.chunks(53) {
833 encoder.write_all(chunk).unwrap();
834 }
835 let compressed = encoder.finish().unwrap();
836 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
837 let mut decoded = Vec::new();
838 decoder.read_to_end(&mut decoded).unwrap();
839 assert_eq!(decoded, payload);
840 }
841
842 #[test]
843 fn write_failure_poisoning_is_sticky() {
844 let mut encoder = StreamingEncoder::new_with_matcher(
845 TinyMatcher::new(4),
846 FailingWriteOnce::new(1),
847 CompressionLevel::Uncompressed,
848 );
849
850 assert!(encoder.write_all(b"ABCD").is_err());
851 assert!(encoder.flush().is_err());
852 assert!(encoder.write_all(b"EFGH").is_err());
853 assert_eq!(encoder.get_ref().sink.len(), 0);
854 assert!(encoder.finish().is_err());
855 }
856
857 #[test]
858 fn poisoned_encoder_returns_original_error_kind() {
859 let mut encoder = StreamingEncoder::new_with_matcher(
860 TinyMatcher::new(4),
861 FailingWithKind::new(1, ErrorKind::BrokenPipe),
862 CompressionLevel::Uncompressed,
863 );
864
865 let first_error = encoder.write_all(b"ABCD").unwrap_err();
866 assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
867
868 let second_error = encoder.write_all(b"EFGH").unwrap_err();
869 assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
870 }
871
872 #[test]
873 fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
874 let payload = b"ABCDEFGHIJKL";
875 let mut encoder = StreamingEncoder::new_with_matcher(
876 TinyMatcher::new(4),
877 FailingWriteOnce::new(3),
878 CompressionLevel::Uncompressed,
879 );
880
881 let first_write = encoder.write(payload).unwrap();
882 assert_eq!(first_write, 8);
883 assert!(encoder.write(&payload[first_write..]).is_err());
884 assert!(encoder.flush().is_err());
885 assert!(encoder.write_all(b"EFGH").is_err());
886 }
887
888 #[test]
889 fn partial_write_failure_after_progress_poisons_encoder() {
890 let payload = b"ABCDEFGHIJKL";
891 let mut encoder = StreamingEncoder::new_with_matcher(
892 TinyMatcher::new(4),
893 PartialThenFailWriter::new(3, 1),
894 CompressionLevel::Uncompressed,
895 );
896
897 let first_write = encoder.write(payload).unwrap();
898 assert_eq!(first_write, 8);
899
900 let second_write = encoder.write(&payload[first_write..]);
901 assert!(second_write.is_err());
902 assert!(encoder.flush().is_err());
903 assert!(encoder.write_all(b"MNOP").is_err());
904 }
905
906 #[test]
907 fn new_with_matcher_and_get_mut_work() {
908 let matcher = TinyMatcher::new(128 * 1024);
909 let mut encoder =
910 StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
911 encoder.get_mut().extend_from_slice(b"");
912 encoder.write_all(b"custom-matcher").unwrap();
913 let compressed = encoder.finish().unwrap();
914 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
915 let mut decoded = Vec::new();
916 decoder.read_to_end(&mut decoded).unwrap();
917 assert_eq!(decoded, b"custom-matcher");
918 }
919
920 #[cfg(feature = "std")]
921 #[test]
922 fn streaming_encoder_output_decompresses_with_c_zstd() {
923 let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
924 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
925 for chunk in payload.chunks(1024) {
926 encoder.write_all(chunk).unwrap();
927 }
928 let compressed = encoder.finish().unwrap();
929
930 let mut decoded = Vec::with_capacity(payload.len());
931 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
932 assert_eq!(decoded, payload);
933 }
934
935 #[test]
936 fn pledged_content_size_written_in_header() {
937 let payload = b"hello world, pledged size test";
938 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
939 encoder
940 .set_pledged_content_size(payload.len() as u64)
941 .unwrap();
942 encoder.write_all(payload).unwrap();
943 let compressed = encoder.finish().unwrap();
944
945 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
947 .unwrap()
948 .0;
949 assert_eq!(header.frame_content_size(), payload.len() as u64);
950
951 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
953 let mut decoded = Vec::new();
954 decoder.read_to_end(&mut decoded).unwrap();
955 assert_eq!(decoded, payload);
956 }
957
958 #[test]
959 fn pledged_content_size_mismatch_returns_error() {
960 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
961 encoder.set_pledged_content_size(100).unwrap();
962 encoder.write_all(b"short payload").unwrap(); let err = encoder.finish().unwrap_err();
964 assert_eq!(err.kind(), ErrorKind::InvalidInput);
965 }
966
967 #[test]
968 fn write_exceeding_pledge_returns_error() {
969 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
970 encoder.set_pledged_content_size(5).unwrap();
971 let err = encoder.write_all(b"exceeds five bytes").unwrap_err();
972 assert_eq!(err.kind(), ErrorKind::InvalidInput);
973 }
974
975 #[test]
976 fn write_straddling_pledge_reports_partial_progress() {
977 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
978 encoder.set_pledged_content_size(5).unwrap();
979 assert_eq!(encoder.write(b"abcdef").unwrap(), 5);
981 let err = encoder.write(b"g").unwrap_err();
983 assert_eq!(err.kind(), ErrorKind::InvalidInput);
984 }
985
986 #[test]
987 fn pledged_content_size_after_write_returns_error() {
988 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
989 encoder.write_all(b"already writing").unwrap();
990 let err = encoder.set_pledged_content_size(15).unwrap_err();
991 assert_eq!(err.kind(), ErrorKind::InvalidInput);
992 }
993
994 #[cfg(feature = "std")]
995 #[test]
996 fn pledged_content_size_c_zstd_compatible() {
997 let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
998 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
999 encoder
1000 .set_pledged_content_size(payload.len() as u64)
1001 .unwrap();
1002 for chunk in payload.chunks(1024) {
1003 encoder.write_all(chunk).unwrap();
1004 }
1005 let compressed = encoder.finish().unwrap();
1006
1007 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1009 .unwrap()
1010 .0;
1011 assert_eq!(header.frame_content_size(), payload.len() as u64);
1012
1013 let mut decoded = Vec::new();
1015 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
1016 assert_eq!(decoded, payload);
1017 }
1018
1019 #[test]
1020 fn no_pledged_size_omits_fcs_from_header() {
1021 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
1022 encoder.write_all(b"no pledged size").unwrap();
1023 let compressed = encoder.finish().unwrap();
1024
1025 let header = crate::decoding::frame::read_frame_header(compressed.as_slice())
1027 .unwrap()
1028 .0;
1029 assert_eq!(header.frame_content_size(), 0);
1030 assert_eq!(header.descriptor.frame_content_size_bytes().unwrap(), 0);
1033 }
1034}