Skip to main content

streaming_crypto/core_api/compression/
stream.rs

1// compression/stream.rs
2//! compression/stream.rs
3//! Streaming helpers that respect chunk_size discipline.
4use std::io::Read;
5
6use crate::compression::types::{Compressor, Decompressor, CompressionError};
7
8/// Summary: Compress data read from R in chunk_size blocks, yielding compressed chunks.
9/// - Respects MAX_CHUNK_SIZE sanity.
10/// - Calls compressor.finish() after EOF to flush pending state.
11#[inline]
12pub fn compress_stream<R: Read>(
13    mut r: R,
14    chunk_size: usize,
15    mut compressor: Box<dyn Compressor>
16) -> impl Iterator<Item = Result<Vec<u8>, CompressionError>> {
17    // assert!(chunk_size > 0 && chunk_size <= MAX_CHUNK_SIZE);
18    let mut buf = vec![0u8; chunk_size];
19    let mut eof = false;
20
21    std::iter::from_fn(move || {
22        if eof {
23            // Flush and end stream (once).
24            let mut out = Vec::new();
25            if let Err(e) = compressor.finish(&mut out) {
26                return Some(Err(e));
27            }
28            if out.is_empty() {
29                return None;
30            } else {
31                return Some(Ok(out));
32            }
33        }
34
35        match r.read(&mut buf) {
36            Ok(0) => {
37                eof = true;
38                // Next iteration will flush.
39                return None;
40            }
41            Ok(n) => {
42                let mut out = Vec::new();
43                if let Err(e) = compressor.compress_chunk(&buf[..n], &mut out) {
44                    return Some(Err(e));
45                }
46                Some(Ok(out))
47            }
48            Err(_) => Some(Err(CompressionError::StateError("read error".into()))),
49        }
50    })
51}
52
53/// Summary: Decompress data read from R in chunk_size blocks, yielding decompressed chunks.
54/// - Respects MAX_CHUNK_SIZE sanity.
55/// - Stateless with respect to frame boundaries (caller controls boundaries).
56#[inline]
57pub fn decompress_stream<R: Read>(
58    mut r: R,
59    chunk_size: usize,
60    mut decompressor: Box<dyn Decompressor>
61) -> impl Iterator<Item = Result<Vec<u8>, CompressionError>> {
62    // assert!(chunk_size > 0 && chunk_size <= MAX_CHUNK_SIZE);
63    let mut buf = vec![0u8; chunk_size];
64
65    std::iter::from_fn(move || {
66        match r.read(&mut buf) {
67            Ok(0) => None,
68            Ok(n) => {
69                let mut out = Vec::new();
70                if let Err(e) = decompressor.decompress_chunk(&buf[..n], &mut out) {
71                    return Some(Err(e));
72                }
73                Some(Ok(out))
74            }
75            Err(_) => Some(Err(CompressionError::StateError("read error".into()))),
76        }
77    })
78}