1use alloc::format;
2use alloc::string::{String, ToString};
3use alloc::vec::Vec;
4use core::mem;
5
6use crate::common::MAX_BLOCK_SIZE;
7#[cfg(feature = "hash")]
8use core::hash::Hasher;
9#[cfg(feature = "hash")]
10use twox_hash::XxHash64;
11
12use crate::encoding::levels::compress_block_encoded;
13use crate::encoding::{
14 CompressionLevel, MatchGeneratorDriver, Matcher, block_header::BlockHeader,
15 frame_compressor::CompressState, frame_compressor::FseTables, frame_header::FrameHeader,
16};
17use crate::io::{Error, ErrorKind, Write};
18
19pub struct StreamingEncoder<W: Write, M: Matcher = MatchGeneratorDriver> {
25 drain: Option<W>,
26 compression_level: CompressionLevel,
27 state: CompressState<M>,
28 pending: Vec<u8>,
29 errored: bool,
30 last_error_kind: Option<ErrorKind>,
31 last_error_message: Option<String>,
32 frame_started: bool,
33 #[cfg(feature = "hash")]
34 hasher: XxHash64,
35}
36
37impl<W: Write> StreamingEncoder<W, MatchGeneratorDriver> {
38 pub fn new(drain: W, compression_level: CompressionLevel) -> Self {
43 Self::new_with_matcher(
44 MatchGeneratorDriver::new(MAX_BLOCK_SIZE as usize, 1),
45 drain,
46 compression_level,
47 )
48 }
49}
50
51impl<W: Write, M: Matcher> StreamingEncoder<W, M> {
52 pub fn new_with_matcher(matcher: M, drain: W, compression_level: CompressionLevel) -> Self {
57 Self {
58 drain: Some(drain),
59 compression_level,
60 state: CompressState {
61 matcher,
62 last_huff_table: None,
63 fse_tables: FseTables::new(),
64 offset_hist: [1, 4, 8],
65 },
66 pending: Vec::new(),
67 errored: false,
68 last_error_kind: None,
69 last_error_message: None,
70 frame_started: false,
71 #[cfg(feature = "hash")]
72 hasher: XxHash64::with_seed(0),
73 }
74 }
75
76 pub fn get_ref(&self) -> &W {
81 self.drain
82 .as_ref()
83 .expect("streaming encoder drain is present until finish consumes self")
84 }
85
86 pub fn get_mut(&mut self) -> &mut W {
94 self.drain
95 .as_mut()
96 .expect("streaming encoder drain is present until finish consumes self")
97 }
98
99 pub fn finish(mut self) -> Result<W, Error> {
104 self.ensure_open()?;
105 self.ensure_frame_started()?;
106
107 if self.pending.is_empty() {
108 self.write_empty_last_block()
109 .map_err(|err| self.fail(err))?;
110 } else {
111 self.emit_pending_block(true)?;
112 }
113
114 let mut drain = self
115 .drain
116 .take()
117 .expect("streaming encoder drain must be present when finishing");
118
119 #[cfg(feature = "hash")]
120 {
121 let checksum = self.hasher.finish() as u32;
122 drain
123 .write_all(&checksum.to_le_bytes())
124 .map_err(|err| self.fail(err))?;
125 }
126
127 drain.flush().map_err(|err| self.fail(err))?;
128 Ok(drain)
129 }
130
131 fn ensure_open(&self) -> Result<(), Error> {
132 if self.errored {
133 return Err(self.sticky_error());
134 }
135 Ok(())
136 }
137
138 fn sticky_error(&self) -> Error {
142 match (self.last_error_kind, self.last_error_message.as_deref()) {
143 (Some(kind), Some(message)) => error_with_kind_message(
144 kind,
145 format!(
146 "streaming encoder is in an errored state due to previous {kind:?} failure: {message}"
147 ),
148 ),
149 (Some(kind), None) => error_from_kind(kind),
150 (None, Some(message)) => other_error_owned(format!(
151 "streaming encoder is in an errored state: {message}"
152 )),
153 (None, None) => other_error("streaming encoder is in an errored state"),
154 }
155 }
156
157 fn drain_mut(&mut self) -> Result<&mut W, Error> {
158 self.drain
159 .as_mut()
160 .ok_or_else(|| other_error("streaming encoder has no active drain"))
161 }
162
163 fn ensure_frame_started(&mut self) -> Result<(), Error> {
164 if self.frame_started {
165 return Ok(());
166 }
167
168 self.ensure_level_supported()?;
169 self.state.matcher.reset(self.compression_level);
170 self.state.offset_hist = [1, 4, 8];
171 self.state.last_huff_table = None;
172 self.state.fse_tables.ll_previous = None;
173 self.state.fse_tables.ml_previous = None;
174 self.state.fse_tables.of_previous = None;
175 #[cfg(feature = "hash")]
176 {
177 self.hasher = XxHash64::with_seed(0);
178 }
179
180 let window_size = self.state.matcher.window_size();
181 if window_size == 0 {
182 return Err(invalid_input_error(
183 "matcher reported window_size == 0, which is invalid",
184 ));
185 }
186
187 let header = FrameHeader {
188 frame_content_size: None,
189 single_segment: false,
190 content_checksum: cfg!(feature = "hash"),
191 dictionary_id: None,
192 window_size: Some(window_size),
193 };
194 let mut encoded_header = Vec::new();
195 header.serialize(&mut encoded_header);
196 self.drain_mut()
197 .and_then(|drain| drain.write_all(&encoded_header))
198 .map_err(|err| self.fail(err))?;
199
200 self.frame_started = true;
201 Ok(())
202 }
203
204 fn block_capacity(&self) -> usize {
205 let matcher_window = self.state.matcher.window_size() as usize;
206 core::cmp::max(1, core::cmp::min(matcher_window, MAX_BLOCK_SIZE as usize))
207 }
208
209 fn allocate_pending_space(&mut self, block_capacity: usize) -> Vec<u8> {
210 let mut space = match self.compression_level {
211 CompressionLevel::Fastest
212 | CompressionLevel::Default
213 | CompressionLevel::Better
214 | CompressionLevel::Best => self.state.matcher.get_next_space(),
215 _ => Vec::new(),
216 };
217 space.clear();
218 if space.capacity() > block_capacity {
219 space.shrink_to(block_capacity);
220 }
221 if space.capacity() < block_capacity {
222 space.reserve(block_capacity - space.capacity());
223 }
224 space
225 }
226
227 fn emit_full_pending_block(
228 &mut self,
229 block_capacity: usize,
230 consumed: usize,
231 ) -> Option<Result<usize, Error>> {
232 if self.pending.len() != block_capacity {
233 return None;
234 }
235
236 let new_pending = self.allocate_pending_space(block_capacity);
237 let full_block = mem::replace(&mut self.pending, new_pending);
238 if let Err((err, restored_block)) = self.encode_block(full_block, false) {
239 self.pending = restored_block;
240 let err = self.fail(err);
241 if consumed > 0 {
242 return Some(Ok(consumed));
243 }
244 return Some(Err(err));
245 }
246 None
247 }
248
249 fn emit_pending_block(&mut self, last_block: bool) -> Result<(), Error> {
250 let block = mem::take(&mut self.pending);
251 if let Err((err, restored_block)) = self.encode_block(block, last_block) {
252 self.pending = restored_block;
253 return Err(self.fail(err));
254 }
255 if !last_block {
256 let block_capacity = self.block_capacity();
257 self.pending = self.allocate_pending_space(block_capacity);
258 }
259 Ok(())
260 }
261
262 fn ensure_level_supported(&self) -> Result<(), Error> {
266 match self.compression_level {
267 CompressionLevel::Uncompressed
268 | CompressionLevel::Fastest
269 | CompressionLevel::Default
270 | CompressionLevel::Better
271 | CompressionLevel::Best => Ok(()),
272 }
273 }
274
275 fn encode_block(
276 &mut self,
277 uncompressed_data: Vec<u8>,
278 last_block: bool,
279 ) -> Result<(), (Error, Vec<u8>)> {
280 let mut raw_block = Some(uncompressed_data);
281 let mut encoded = Vec::with_capacity(self.block_capacity() + 3);
283 let mut moved_into_matcher = false;
284 if raw_block.as_ref().is_some_and(|block| block.is_empty()) {
285 let header = BlockHeader {
286 last_block,
287 block_type: crate::blocks::block::BlockType::Raw,
288 block_size: 0,
289 };
290 header.serialize(&mut encoded);
291 } else {
292 match self.compression_level {
293 CompressionLevel::Uncompressed => {
294 let block = raw_block.as_ref().expect("raw block missing");
295 let header = BlockHeader {
296 last_block,
297 block_type: crate::blocks::block::BlockType::Raw,
298 block_size: block.len() as u32,
299 };
300 header.serialize(&mut encoded);
301 encoded.extend_from_slice(block);
302 }
303 CompressionLevel::Fastest
304 | CompressionLevel::Default
305 | CompressionLevel::Better
306 | CompressionLevel::Best => {
307 let block = raw_block.take().expect("raw block missing");
308 debug_assert!(!block.is_empty(), "empty blocks handled above");
309 compress_block_encoded(&mut self.state, last_block, block, &mut encoded);
310 moved_into_matcher = true;
311 }
312 }
313 }
314
315 if let Err(err) = self.drain_mut().and_then(|drain| drain.write_all(&encoded)) {
316 let restored = if moved_into_matcher {
317 self.state.matcher.get_last_space().to_vec()
318 } else {
319 raw_block.unwrap_or_default()
320 };
321 return Err((err, restored));
322 }
323
324 if moved_into_matcher {
325 #[cfg(feature = "hash")]
326 {
327 self.hasher.write(self.state.matcher.get_last_space());
328 }
329 } else {
330 self.hash_block(raw_block.as_deref().unwrap_or(&[]));
331 }
332 Ok(())
333 }
334
335 fn write_empty_last_block(&mut self) -> Result<(), Error> {
336 self.encode_block(Vec::new(), true).map_err(|(err, _)| err)
337 }
338
339 fn fail(&mut self, err: Error) -> Error {
340 self.errored = true;
341 if self.last_error_kind.is_none() {
342 self.last_error_kind = Some(err.kind());
343 }
344 if self.last_error_message.is_none() {
345 self.last_error_message = Some(err.to_string());
346 }
347 err
348 }
349
350 #[cfg(feature = "hash")]
351 fn hash_block(&mut self, uncompressed_data: &[u8]) {
352 self.hasher.write(uncompressed_data);
353 }
354
355 #[cfg(not(feature = "hash"))]
356 fn hash_block(&mut self, _uncompressed_data: &[u8]) {}
357}
358
359impl<W: Write, M: Matcher> Write for StreamingEncoder<W, M> {
360 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
361 self.ensure_open()?;
362 if buf.is_empty() {
363 return Ok(0);
364 }
365
366 self.ensure_frame_started()?;
367 let block_capacity = self.block_capacity();
368 if self.pending.capacity() == 0 {
369 self.pending = self.allocate_pending_space(block_capacity);
370 }
371 let mut remaining = buf;
372 let mut consumed = 0usize;
373
374 while !remaining.is_empty() {
375 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
376 return result;
377 }
378
379 let available = block_capacity - self.pending.len();
380 let to_take = core::cmp::min(remaining.len(), available);
381 if to_take == 0 {
382 break;
383 }
384 self.pending.extend_from_slice(&remaining[..to_take]);
385 remaining = &remaining[to_take..];
386 consumed += to_take;
387
388 if let Some(result) = self.emit_full_pending_block(block_capacity, consumed) {
389 return result;
390 }
391 }
392 Ok(consumed)
393 }
394
395 fn flush(&mut self) -> Result<(), Error> {
396 self.ensure_open()?;
397 if self.pending.is_empty() {
398 return self
399 .drain_mut()
400 .and_then(|drain| drain.flush())
401 .map_err(|err| self.fail(err));
402 }
403 self.ensure_frame_started()?;
404 self.emit_pending_block(false)?;
405 self.drain_mut()
406 .and_then(|drain| drain.flush())
407 .map_err(|err| self.fail(err))
408 }
409}
410
411fn error_from_kind(kind: ErrorKind) -> Error {
412 Error::from(kind)
413}
414
415fn error_with_kind_message(kind: ErrorKind, message: String) -> Error {
416 #[cfg(feature = "std")]
417 {
418 Error::new(kind, message)
419 }
420 #[cfg(not(feature = "std"))]
421 {
422 Error::new(kind, alloc::boxed::Box::new(message))
423 }
424}
425
426fn invalid_input_error(message: &str) -> Error {
427 #[cfg(feature = "std")]
428 {
429 Error::new(ErrorKind::InvalidInput, message)
430 }
431 #[cfg(not(feature = "std"))]
432 {
433 Error::new(
434 ErrorKind::Other,
435 alloc::boxed::Box::new(alloc::string::String::from(message)),
436 )
437 }
438}
439
440fn other_error_owned(message: String) -> Error {
441 #[cfg(feature = "std")]
442 {
443 Error::other(message)
444 }
445 #[cfg(not(feature = "std"))]
446 {
447 Error::new(ErrorKind::Other, alloc::boxed::Box::new(message))
448 }
449}
450
451fn other_error(message: &str) -> Error {
452 #[cfg(feature = "std")]
453 {
454 Error::other(message)
455 }
456 #[cfg(not(feature = "std"))]
457 {
458 Error::new(
459 ErrorKind::Other,
460 alloc::boxed::Box::new(alloc::string::String::from(message)),
461 )
462 }
463}
464
465#[cfg(test)]
466mod tests {
467 use crate::decoding::StreamingDecoder;
468 use crate::encoding::{CompressionLevel, Matcher, Sequence, StreamingEncoder};
469 use crate::io::{Error, ErrorKind, Read, Write};
470 use alloc::vec;
471 use alloc::vec::Vec;
472
473 struct TinyMatcher {
474 last_space: Vec<u8>,
475 window_size: u64,
476 }
477
478 impl TinyMatcher {
479 fn new(window_size: u64) -> Self {
480 Self {
481 last_space: Vec::new(),
482 window_size,
483 }
484 }
485 }
486
487 impl Matcher for TinyMatcher {
488 fn get_next_space(&mut self) -> Vec<u8> {
489 vec![0; self.window_size as usize]
490 }
491
492 fn get_last_space(&mut self) -> &[u8] {
493 self.last_space.as_slice()
494 }
495
496 fn commit_space(&mut self, space: Vec<u8>) {
497 self.last_space = space;
498 }
499
500 fn skip_matching(&mut self) {}
501
502 fn start_matching(&mut self, mut handle_sequence: impl for<'a> FnMut(Sequence<'a>)) {
503 handle_sequence(Sequence::Literals {
504 literals: self.last_space.as_slice(),
505 });
506 }
507
508 fn reset(&mut self, _level: CompressionLevel) {
509 self.last_space.clear();
510 }
511
512 fn window_size(&self) -> u64 {
513 self.window_size
514 }
515 }
516
517 struct FailingWriteOnce {
518 writes: usize,
519 fail_on_write_number: usize,
520 sink: Vec<u8>,
521 }
522
523 impl FailingWriteOnce {
524 fn new(fail_on_write_number: usize) -> Self {
525 Self {
526 writes: 0,
527 fail_on_write_number,
528 sink: Vec::new(),
529 }
530 }
531 }
532
533 impl Write for FailingWriteOnce {
534 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
535 self.writes += 1;
536 if self.writes == self.fail_on_write_number {
537 return Err(super::other_error("injected write failure"));
538 }
539 self.sink.extend_from_slice(buf);
540 Ok(buf.len())
541 }
542
543 fn flush(&mut self) -> Result<(), Error> {
544 Ok(())
545 }
546 }
547
548 struct FailingWithKind {
549 writes: usize,
550 fail_on_write_number: usize,
551 kind: ErrorKind,
552 }
553
554 impl FailingWithKind {
555 fn new(fail_on_write_number: usize, kind: ErrorKind) -> Self {
556 Self {
557 writes: 0,
558 fail_on_write_number,
559 kind,
560 }
561 }
562 }
563
564 impl Write for FailingWithKind {
565 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
566 self.writes += 1;
567 if self.writes == self.fail_on_write_number {
568 return Err(Error::from(self.kind));
569 }
570 Ok(buf.len())
571 }
572
573 fn flush(&mut self) -> Result<(), Error> {
574 Ok(())
575 }
576 }
577
578 struct PartialThenFailWriter {
579 writes: usize,
580 fail_on_write_number: usize,
581 partial_prefix_len: usize,
582 terminal_failure: bool,
583 sink: Vec<u8>,
584 }
585
586 impl PartialThenFailWriter {
587 fn new(fail_on_write_number: usize, partial_prefix_len: usize) -> Self {
588 Self {
589 writes: 0,
590 fail_on_write_number,
591 partial_prefix_len,
592 terminal_failure: false,
593 sink: Vec::new(),
594 }
595 }
596 }
597
598 impl Write for PartialThenFailWriter {
599 fn write(&mut self, buf: &[u8]) -> Result<usize, Error> {
600 if self.terminal_failure {
601 return Err(super::other_error("injected terminal write failure"));
602 }
603
604 self.writes += 1;
605 if self.writes == self.fail_on_write_number {
606 let written = core::cmp::min(self.partial_prefix_len, buf.len());
607 if written > 0 {
608 self.sink.extend_from_slice(&buf[..written]);
609 self.terminal_failure = true;
610 return Ok(written);
611 }
612 return Err(super::other_error("injected terminal write failure"));
613 }
614
615 self.sink.extend_from_slice(buf);
616 Ok(buf.len())
617 }
618
619 fn flush(&mut self) -> Result<(), Error> {
620 Ok(())
621 }
622 }
623
624 #[test]
625 fn streaming_encoder_roundtrip_multiple_writes() {
626 let payload = b"streaming-encoder-roundtrip-".repeat(1024);
627 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
628 for chunk in payload.chunks(313) {
629 encoder.write_all(chunk).unwrap();
630 }
631 let compressed = encoder.finish().unwrap();
632
633 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
634 let mut decoded = Vec::new();
635 decoder.read_to_end(&mut decoded).unwrap();
636 assert_eq!(decoded, payload);
637 }
638
639 #[test]
640 fn flush_emits_nonempty_partial_output() {
641 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
642 encoder.write_all(b"partial-block").unwrap();
643 encoder.flush().unwrap();
644 let flushed_len = encoder.get_ref().len();
645 assert!(
646 flushed_len > 0,
647 "flush should emit header+partial block bytes"
648 );
649 let compressed = encoder.finish().unwrap();
650 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
651 let mut decoded = Vec::new();
652 decoder.read_to_end(&mut decoded).unwrap();
653 assert_eq!(decoded, b"partial-block");
654 }
655
656 #[test]
657 fn flush_without_writes_does_not_emit_frame_header() {
658 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
659 encoder.flush().unwrap();
660 assert!(encoder.get_ref().is_empty());
661 }
662
663 #[test]
664 fn block_boundary_write_emits_block_in_same_call() {
665 let mut boundary = StreamingEncoder::new_with_matcher(
666 TinyMatcher::new(4),
667 Vec::new(),
668 CompressionLevel::Uncompressed,
669 );
670 let mut below = StreamingEncoder::new_with_matcher(
671 TinyMatcher::new(4),
672 Vec::new(),
673 CompressionLevel::Uncompressed,
674 );
675
676 boundary.write_all(b"ABCD").unwrap();
677 below.write_all(b"ABC").unwrap();
678
679 let boundary_len = boundary.get_ref().len();
680 let below_len = below.get_ref().len();
681 assert!(
682 boundary_len > below_len,
683 "full block should be emitted immediately at block boundary"
684 );
685 }
686
687 #[test]
688 fn finish_consumes_encoder_and_emits_frame() {
689 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
690 encoder.write_all(b"abc").unwrap();
691 let compressed = encoder.finish().unwrap();
692 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
693 let mut decoded = Vec::new();
694 decoder.read_to_end(&mut decoded).unwrap();
695 assert_eq!(decoded, b"abc");
696 }
697
698 #[test]
699 fn finish_without_writes_emits_empty_frame() {
700 let encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
701 let compressed = encoder.finish().unwrap();
702 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
703 let mut decoded = Vec::new();
704 decoder.read_to_end(&mut decoded).unwrap();
705 assert!(decoded.is_empty());
706 }
707
708 #[test]
709 fn write_empty_buffer_returns_zero() {
710 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
711 assert_eq!(encoder.write(&[]).unwrap(), 0);
712 let _ = encoder.finish().unwrap();
713 }
714
715 #[test]
716 fn uncompressed_level_roundtrip() {
717 let payload = b"uncompressed-streaming-roundtrip".repeat(64);
718 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Uncompressed);
719 for chunk in payload.chunks(41) {
720 encoder.write_all(chunk).unwrap();
721 }
722 let compressed = encoder.finish().unwrap();
723 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
724 let mut decoded = Vec::new();
725 decoder.read_to_end(&mut decoded).unwrap();
726 assert_eq!(decoded, payload);
727 }
728
729 #[test]
730 fn better_level_streaming_roundtrip() {
731 let payload = b"better-level-streaming-test".repeat(256);
732 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Better);
733 for chunk in payload.chunks(53) {
734 encoder.write_all(chunk).unwrap();
735 }
736 let compressed = encoder.finish().unwrap();
737 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
738 let mut decoded = Vec::new();
739 decoder.read_to_end(&mut decoded).unwrap();
740 assert_eq!(decoded, payload);
741 }
742
743 #[test]
744 fn zero_window_matcher_returns_invalid_input_error() {
745 let mut encoder = StreamingEncoder::new_with_matcher(
746 TinyMatcher::new(0),
747 Vec::new(),
748 CompressionLevel::Fastest,
749 );
750 let err = encoder.write_all(b"payload").unwrap_err();
751 assert_eq!(err.kind(), ErrorKind::InvalidInput);
752 }
753
754 #[test]
755 fn best_level_streaming_roundtrip() {
756 let payload = b"best-level-streaming-test".repeat(8 * 1024);
759 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Best);
760 for chunk in payload.chunks(53) {
761 encoder.write_all(chunk).unwrap();
762 }
763 let compressed = encoder.finish().unwrap();
764 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
765 let mut decoded = Vec::new();
766 decoder.read_to_end(&mut decoded).unwrap();
767 assert_eq!(decoded, payload);
768 }
769
770 #[test]
771 fn write_failure_poisoning_is_sticky() {
772 let mut encoder = StreamingEncoder::new_with_matcher(
773 TinyMatcher::new(4),
774 FailingWriteOnce::new(1),
775 CompressionLevel::Uncompressed,
776 );
777
778 assert!(encoder.write_all(b"ABCD").is_err());
779 assert!(encoder.flush().is_err());
780 assert!(encoder.write_all(b"EFGH").is_err());
781 assert_eq!(encoder.get_ref().sink.len(), 0);
782 assert!(encoder.finish().is_err());
783 }
784
785 #[test]
786 fn poisoned_encoder_returns_original_error_kind() {
787 let mut encoder = StreamingEncoder::new_with_matcher(
788 TinyMatcher::new(4),
789 FailingWithKind::new(1, ErrorKind::BrokenPipe),
790 CompressionLevel::Uncompressed,
791 );
792
793 let first_error = encoder.write_all(b"ABCD").unwrap_err();
794 assert_eq!(first_error.kind(), ErrorKind::BrokenPipe);
795
796 let second_error = encoder.write_all(b"EFGH").unwrap_err();
797 assert_eq!(second_error.kind(), ErrorKind::BrokenPipe);
798 }
799
800 #[test]
801 fn write_reports_progress_but_poisoning_is_sticky_after_later_block_failure() {
802 let payload = b"ABCDEFGHIJKL";
803 let mut encoder = StreamingEncoder::new_with_matcher(
804 TinyMatcher::new(4),
805 FailingWriteOnce::new(3),
806 CompressionLevel::Uncompressed,
807 );
808
809 let first_write = encoder.write(payload).unwrap();
810 assert_eq!(first_write, 8);
811 assert!(encoder.write(&payload[first_write..]).is_err());
812 assert!(encoder.flush().is_err());
813 assert!(encoder.write_all(b"EFGH").is_err());
814 }
815
816 #[test]
817 fn partial_write_failure_after_progress_poisons_encoder() {
818 let payload = b"ABCDEFGHIJKL";
819 let mut encoder = StreamingEncoder::new_with_matcher(
820 TinyMatcher::new(4),
821 PartialThenFailWriter::new(3, 1),
822 CompressionLevel::Uncompressed,
823 );
824
825 let first_write = encoder.write(payload).unwrap();
826 assert_eq!(first_write, 8);
827
828 let second_write = encoder.write(&payload[first_write..]);
829 assert!(second_write.is_err());
830 assert!(encoder.flush().is_err());
831 assert!(encoder.write_all(b"MNOP").is_err());
832 }
833
834 #[test]
835 fn new_with_matcher_and_get_mut_work() {
836 let matcher = TinyMatcher::new(128 * 1024);
837 let mut encoder =
838 StreamingEncoder::new_with_matcher(matcher, Vec::new(), CompressionLevel::Fastest);
839 encoder.get_mut().extend_from_slice(b"");
840 encoder.write_all(b"custom-matcher").unwrap();
841 let compressed = encoder.finish().unwrap();
842 let mut decoder = StreamingDecoder::new(compressed.as_slice()).unwrap();
843 let mut decoded = Vec::new();
844 decoder.read_to_end(&mut decoded).unwrap();
845 assert_eq!(decoded, b"custom-matcher");
846 }
847
848 #[cfg(feature = "std")]
849 #[test]
850 fn streaming_encoder_output_decompresses_with_c_zstd() {
851 let payload = b"tenant=demo op=put key=streaming value=abcdef\n".repeat(4096);
852 let mut encoder = StreamingEncoder::new(Vec::new(), CompressionLevel::Fastest);
853 for chunk in payload.chunks(1024) {
854 encoder.write_all(chunk).unwrap();
855 }
856 let compressed = encoder.finish().unwrap();
857
858 let mut decoded = Vec::with_capacity(payload.len());
859 zstd::stream::copy_decode(compressed.as_slice(), &mut decoded).unwrap();
860 assert_eq!(decoded, payload);
861 }
862}