ipfrs_core/
streaming_compression.rs

1//! Streaming compression and decompression support
2//!
3//! This module provides streaming compression and decompression capabilities for large data
4//! that cannot or should not be loaded entirely into memory. It integrates with the async
5//! streaming infrastructure and supports all compression algorithms.
6//!
7//! # Features
8//!
9//! - **Async streaming compression** - Compress data on-the-fly as it's streamed
10//! - **Async streaming decompression** - Decompress data on-the-fly as it's streamed
11//! - **All algorithms supported** - Zstd, LZ4, and passthrough (None)
12//! - **Configurable buffer sizes** - Tune memory usage vs performance
13//! - **Statistics tracking** - Monitor bytes processed and compression ratios
14//!
15//! # Example
16//!
17//! ```rust
18//! use ipfrs_core::streaming_compression::{CompressingStream, CompressionAlgorithm};
19//! use tokio::io::AsyncReadExt;
20//! use bytes::Bytes;
21//!
22//! # #[tokio::main]
23//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
24//! // Create data to compress
25//! let data = Bytes::from(b"Hello, world! ".repeat(1000));
26//! let cursor = std::io::Cursor::new(data.to_vec());
27//!
28//! // Create a compressing stream
29//! let mut stream = CompressingStream::new(cursor, CompressionAlgorithm::Zstd, 3)?;
30//!
31//! // Read compressed data
32//! let mut compressed = Vec::new();
33//! stream.read_to_end(&mut compressed).await?;
34//!
35//! println!("Original: {} bytes", data.len());
36//! println!("Compressed: {} bytes", compressed.len());
37//! println!("Ratio: {:.2}%", (compressed.len() as f64 / data.len() as f64) * 100.0);
38//! # Ok(())
39//! # }
40//! ```
41
42use crate::error::{Error, Result};
43use bytes::{Bytes, BytesMut};
44use std::io::Cursor;
45use std::pin::Pin;
46use std::task::{Context, Poll};
47use tokio::io::{AsyncRead, ReadBuf};
48
49// Re-export CompressionAlgorithm for convenience in doc tests
50pub use crate::compression::CompressionAlgorithm;
51
52/// Buffer size for streaming operations (64KB)
53const DEFAULT_BUFFER_SIZE: usize = 64 * 1024;
54
55/// A streaming compressor that compresses data on-the-fly
56///
57/// This struct wraps an `AsyncRead` source and compresses the data as it's read,
58/// providing an efficient way to compress large files without loading them entirely
59/// into memory.
60///
61/// # Example
62///
63/// ```rust
64/// use ipfrs_core::streaming_compression::{CompressingStream, CompressionAlgorithm};
65/// use tokio::io::AsyncReadExt;
66/// use bytes::Bytes;
67///
68/// # #[tokio::main]
69/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
70/// let data = b"Hello, streaming compression!".repeat(100);
71/// let cursor = std::io::Cursor::new(data.clone());
72///
73/// let mut stream = CompressingStream::new(cursor, CompressionAlgorithm::Zstd, 5)?;
74///
75/// let mut compressed = Vec::new();
76/// stream.read_to_end(&mut compressed).await?;
77///
78/// let stats = stream.stats();
79/// println!("Compressed {} bytes to {} bytes", stats.bytes_read, stats.bytes_written);
80/// # Ok(())
81/// # }
82/// ```
83pub struct CompressingStream<R: AsyncRead + Unpin> {
84    reader: R,
85    algorithm: CompressionAlgorithm,
86    level: u8,
87    buffer: BytesMut,
88    compressed_buffer: Cursor<Vec<u8>>,
89    stats: StreamingStats,
90    finished: bool,
91    buffer_size: usize,
92}
93
94impl<R: AsyncRead + Unpin> CompressingStream<R> {
95    /// Create a new compressing stream
96    ///
97    /// # Arguments
98    ///
99    /// * `reader` - The source to read uncompressed data from
100    /// * `algorithm` - The compression algorithm to use
101    /// * `level` - Compression level (0-9)
102    ///
103    /// # Returns
104    ///
105    /// A new `CompressingStream` instance
106    pub fn new(reader: R, algorithm: CompressionAlgorithm, level: u8) -> Result<Self> {
107        if level > 9 {
108            return Err(Error::InvalidInput(format!(
109                "compression level must be 0-9, got {}",
110                level
111            )));
112        }
113
114        Ok(Self {
115            reader,
116            algorithm,
117            level,
118            buffer: BytesMut::with_capacity(DEFAULT_BUFFER_SIZE),
119            compressed_buffer: Cursor::new(Vec::new()),
120            stats: StreamingStats::default(),
121            finished: false,
122            buffer_size: DEFAULT_BUFFER_SIZE,
123        })
124    }
125
126    /// Create a new compressing stream with a custom buffer size
127    ///
128    /// # Arguments
129    ///
130    /// * `reader` - The source to read uncompressed data from
131    /// * `algorithm` - The compression algorithm to use
132    /// * `level` - Compression level (0-9)
133    /// * `buffer_size` - Size of the internal buffer in bytes
134    pub fn with_buffer_size(
135        reader: R,
136        algorithm: CompressionAlgorithm,
137        level: u8,
138        buffer_size: usize,
139    ) -> Result<Self> {
140        if level > 9 {
141            return Err(Error::InvalidInput(format!(
142                "compression level must be 0-9, got {}",
143                level
144            )));
145        }
146
147        Ok(Self {
148            reader,
149            algorithm,
150            level,
151            buffer: BytesMut::with_capacity(buffer_size),
152            compressed_buffer: Cursor::new(Vec::new()),
153            stats: StreamingStats::default(),
154            finished: false,
155            buffer_size,
156        })
157    }
158
159    /// Get statistics about the compression operation
160    pub fn stats(&self) -> &StreamingStats {
161        &self.stats
162    }
163}
164
165impl<R: AsyncRead + Unpin> AsyncRead for CompressingStream<R> {
166    fn poll_read(
167        mut self: Pin<&mut Self>,
168        cx: &mut Context<'_>,
169        buf: &mut ReadBuf<'_>,
170    ) -> Poll<std::io::Result<()>> {
171        // Try to read from compressed buffer first
172        let pos = self.compressed_buffer.position() as usize;
173        let available = self.compressed_buffer.get_ref().len() - pos;
174
175        if available > 0 {
176            let to_copy = available.min(buf.remaining());
177            buf.put_slice(&self.compressed_buffer.get_ref()[pos..pos + to_copy]);
178            self.compressed_buffer.set_position((pos + to_copy) as u64);
179            return Poll::Ready(Ok(()));
180        }
181
182        // If finished and no more data, return EOF
183        if self.finished {
184            return Poll::Ready(Ok(()));
185        }
186
187        // Need to read more data from source
188        // Get self as mut ref to avoid borrow checker issues
189        let this = &mut *self;
190
191        this.buffer.resize(this.buffer_size, 0);
192        let mut read_buf = ReadBuf::new(&mut this.buffer[..]);
193
194        match Pin::new(&mut this.reader).poll_read(cx, &mut read_buf) {
195            Poll::Ready(Ok(())) => {
196                let n = read_buf.filled().len();
197
198                if n == 0 {
199                    this.finished = true;
200                    return Poll::Ready(Ok(()));
201                }
202
203                this.stats.bytes_read += n as u64;
204
205                // Compress the data
206                let data = Bytes::from(this.buffer[..n].to_vec());
207                let compressed =
208                    match crate::compression::compress(&data, this.algorithm, this.level) {
209                        Ok(c) => c,
210                        Err(e) => return Poll::Ready(Err(std::io::Error::other(e.to_string()))),
211                    };
212
213                this.stats.bytes_written += compressed.len() as u64;
214                this.compressed_buffer = Cursor::new(compressed.to_vec());
215
216                // Now read from the compressed buffer
217                let pos = this.compressed_buffer.position() as usize;
218                let available = this.compressed_buffer.get_ref().len() - pos;
219
220                if available > 0 {
221                    let to_copy = available.min(buf.remaining());
222                    buf.put_slice(&this.compressed_buffer.get_ref()[pos..pos + to_copy]);
223                    this.compressed_buffer.set_position((pos + to_copy) as u64);
224                }
225
226                Poll::Ready(Ok(()))
227            }
228            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
229            Poll::Pending => Poll::Pending,
230        }
231    }
232}
233
234/// A streaming decompressor that decompresses data on-the-fly
235///
236/// This struct wraps an `AsyncRead` source and decompresses the data as it's read,
237/// providing an efficient way to decompress large files without loading them entirely
238/// into memory.
239///
240/// # Example
241///
242/// ```rust
243/// use ipfrs_core::streaming_compression::{CompressingStream, DecompressingStream, CompressionAlgorithm};
244/// use tokio::io::{AsyncReadExt, AsyncWriteExt};
245/// use bytes::Bytes;
246///
247/// # #[tokio::main]
248/// # async fn main() -> Result<(), Box<dyn std::error::Error>> {
249/// // First compress some data
250/// let original = b"Hello, streaming decompression!".repeat(100);
251/// let cursor = std::io::Cursor::new(original.clone());
252/// let mut compressor = CompressingStream::new(cursor, CompressionAlgorithm::Zstd, 5)?;
253///
254/// let mut compressed = Vec::new();
255/// compressor.read_to_end(&mut compressed).await?;
256///
257/// // Now decompress it
258/// let cursor = std::io::Cursor::new(compressed);
259/// let mut decompressor = DecompressingStream::new(cursor, CompressionAlgorithm::Zstd)?;
260///
261/// let mut decompressed = Vec::new();
262/// decompressor.read_to_end(&mut decompressed).await?;
263///
264/// assert_eq!(original, decompressed);
265/// # Ok(())
266/// # }
267/// ```
268pub struct DecompressingStream<R: AsyncRead + Unpin> {
269    reader: R,
270    algorithm: CompressionAlgorithm,
271    buffer: BytesMut,
272    decompressed_buffer: Cursor<Vec<u8>>,
273    stats: StreamingStats,
274    finished: bool,
275    buffer_size: usize,
276}
277
278impl<R: AsyncRead + Unpin> DecompressingStream<R> {
279    /// Create a new decompressing stream
280    ///
281    /// # Arguments
282    ///
283    /// * `reader` - The source to read compressed data from
284    /// * `algorithm` - The compression algorithm that was used
285    pub fn new(reader: R, algorithm: CompressionAlgorithm) -> Result<Self> {
286        Ok(Self {
287            reader,
288            algorithm,
289            buffer: BytesMut::with_capacity(DEFAULT_BUFFER_SIZE),
290            decompressed_buffer: Cursor::new(Vec::new()),
291            stats: StreamingStats::default(),
292            finished: false,
293            buffer_size: DEFAULT_BUFFER_SIZE,
294        })
295    }
296
297    /// Create a new decompressing stream with a custom buffer size
298    ///
299    /// # Arguments
300    ///
301    /// * `reader` - The source to read compressed data from
302    /// * `algorithm` - The compression algorithm that was used
303    /// * `buffer_size` - Size of the internal buffer in bytes
304    pub fn with_buffer_size(
305        reader: R,
306        algorithm: CompressionAlgorithm,
307        buffer_size: usize,
308    ) -> Result<Self> {
309        Ok(Self {
310            reader,
311            algorithm,
312            buffer: BytesMut::with_capacity(buffer_size),
313            decompressed_buffer: Cursor::new(Vec::new()),
314            stats: StreamingStats::default(),
315            finished: false,
316            buffer_size,
317        })
318    }
319
320    /// Get statistics about the decompression operation
321    pub fn stats(&self) -> &StreamingStats {
322        &self.stats
323    }
324}
325
326impl<R: AsyncRead + Unpin> AsyncRead for DecompressingStream<R> {
327    fn poll_read(
328        mut self: Pin<&mut Self>,
329        cx: &mut Context<'_>,
330        buf: &mut ReadBuf<'_>,
331    ) -> Poll<std::io::Result<()>> {
332        // Try to read from decompressed buffer first
333        let pos = self.decompressed_buffer.position() as usize;
334        let available = self.decompressed_buffer.get_ref().len() - pos;
335
336        if available > 0 {
337            let to_copy = available.min(buf.remaining());
338            buf.put_slice(&self.decompressed_buffer.get_ref()[pos..pos + to_copy]);
339            self.decompressed_buffer
340                .set_position((pos + to_copy) as u64);
341            return Poll::Ready(Ok(()));
342        }
343
344        // If finished and no more data, return EOF
345        if self.finished {
346            return Poll::Ready(Ok(()));
347        }
348
349        // Need to read more data from source
350        // Get self as mut ref to avoid borrow checker issues
351        let this = &mut *self;
352
353        this.buffer.resize(this.buffer_size, 0);
354        let mut read_buf = ReadBuf::new(&mut this.buffer[..]);
355
356        match Pin::new(&mut this.reader).poll_read(cx, &mut read_buf) {
357            Poll::Ready(Ok(())) => {
358                let n = read_buf.filled().len();
359
360                if n == 0 {
361                    this.finished = true;
362                    return Poll::Ready(Ok(()));
363                }
364
365                this.stats.bytes_read += n as u64;
366
367                // Decompress the data
368                let data = Bytes::from(this.buffer[..n].to_vec());
369                let decompressed = match crate::compression::decompress(&data, this.algorithm) {
370                    Ok(d) => d,
371                    Err(e) => return Poll::Ready(Err(std::io::Error::other(e.to_string()))),
372                };
373
374                this.stats.bytes_written += decompressed.len() as u64;
375                this.decompressed_buffer = Cursor::new(decompressed.to_vec());
376
377                // Now read from the decompressed buffer
378                let pos = this.decompressed_buffer.position() as usize;
379                let available = this.decompressed_buffer.get_ref().len() - pos;
380
381                if available > 0 {
382                    let to_copy = available.min(buf.remaining());
383                    buf.put_slice(&this.decompressed_buffer.get_ref()[pos..pos + to_copy]);
384                    this.decompressed_buffer
385                        .set_position((pos + to_copy) as u64);
386                }
387
388                Poll::Ready(Ok(()))
389            }
390            Poll::Ready(Err(e)) => Poll::Ready(Err(e)),
391            Poll::Pending => Poll::Pending,
392        }
393    }
394}
395
396/// Statistics for streaming compression/decompression
397#[derive(Debug, Clone, Default)]
398pub struct StreamingStats {
399    /// Total bytes read from source
400    pub bytes_read: u64,
401    /// Total bytes written (compressed/decompressed)
402    pub bytes_written: u64,
403}
404
405impl StreamingStats {
406    /// Calculate the compression ratio (compressed_size / original_size)
407    ///
408    /// Returns 1.0 if no data has been processed.
409    pub fn compression_ratio(&self) -> f64 {
410        if self.bytes_read == 0 {
411            1.0
412        } else {
413            self.bytes_written as f64 / self.bytes_read as f64
414        }
415    }
416
417    /// Calculate space saved (in bytes)
418    pub fn bytes_saved(&self) -> i64 {
419        self.bytes_read as i64 - self.bytes_written as i64
420    }
421
422    /// Calculate space saved as a percentage
423    pub fn savings_percent(&self) -> f64 {
424        if self.bytes_read == 0 {
425            0.0
426        } else {
427            (self.bytes_saved() as f64 / self.bytes_read as f64) * 100.0
428        }
429    }
430}
431
432#[cfg(test)]
433mod tests {
434    use super::*;
435    use tokio::io::AsyncReadExt;
436
437    #[tokio::test]
438    async fn test_compressing_stream_zstd() {
439        let data = b"Hello, world! ".repeat(100);
440        let cursor = std::io::Cursor::new(data.clone());
441
442        let mut stream = CompressingStream::new(cursor, CompressionAlgorithm::Zstd, 3).unwrap();
443
444        let mut compressed = Vec::new();
445        stream.read_to_end(&mut compressed).await.unwrap();
446
447        assert!(compressed.len() < data.len());
448        let stats = stream.stats();
449        assert_eq!(stats.bytes_read, data.len() as u64);
450        assert!(stats.compression_ratio() < 1.0);
451    }
452
453    #[tokio::test]
454    async fn test_compressing_stream_lz4() {
455        let data = b"Test data for LZ4 compression! ".repeat(100);
456        let cursor = std::io::Cursor::new(data.clone());
457
458        let mut stream = CompressingStream::new(cursor, CompressionAlgorithm::Lz4, 5).unwrap();
459
460        let mut compressed = Vec::new();
461        stream.read_to_end(&mut compressed).await.unwrap();
462
463        assert!(compressed.len() < data.len());
464    }
465
466    #[tokio::test]
467    async fn test_compressing_stream_none() {
468        let data = b"No compression applied".repeat(10);
469        let cursor = std::io::Cursor::new(data.clone());
470
471        let mut stream = CompressingStream::new(cursor, CompressionAlgorithm::None, 0).unwrap();
472
473        let mut output = Vec::new();
474        stream.read_to_end(&mut output).await.unwrap();
475
476        assert_eq!(output, data);
477        let stats = stream.stats();
478        assert_eq!(stats.compression_ratio(), 1.0);
479    }
480
481    #[tokio::test]
482    async fn test_decompressing_stream_roundtrip() {
483        let original = b"Roundtrip test data! ".repeat(100);
484
485        // Compress
486        let cursor = std::io::Cursor::new(original.clone());
487        let mut compressor = CompressingStream::new(cursor, CompressionAlgorithm::Zstd, 5).unwrap();
488
489        let mut compressed = Vec::new();
490        compressor.read_to_end(&mut compressed).await.unwrap();
491
492        // Decompress
493        let cursor = std::io::Cursor::new(compressed);
494        let mut decompressor =
495            DecompressingStream::new(cursor, CompressionAlgorithm::Zstd).unwrap();
496
497        let mut decompressed = Vec::new();
498        decompressor.read_to_end(&mut decompressed).await.unwrap();
499
500        assert_eq!(original, decompressed.as_slice());
501    }
502
503    #[tokio::test]
504    async fn test_streaming_stats() {
505        let data = vec![0u8; 10000];
506        let cursor = std::io::Cursor::new(data.clone());
507
508        let mut stream = CompressingStream::new(cursor, CompressionAlgorithm::Zstd, 6).unwrap();
509
510        let mut compressed = Vec::new();
511        stream.read_to_end(&mut compressed).await.unwrap();
512
513        let stats = stream.stats();
514        assert_eq!(stats.bytes_read, 10000);
515        assert!(stats.bytes_written < 10000);
516        assert!(stats.compression_ratio() < 1.0);
517        assert!(stats.bytes_saved() > 0);
518        assert!(stats.savings_percent() > 0.0);
519    }
520
521    #[tokio::test]
522    async fn test_custom_buffer_size() {
523        let data = b"Custom buffer size test".repeat(50);
524        let cursor = std::io::Cursor::new(data.clone());
525
526        let mut stream =
527            CompressingStream::with_buffer_size(cursor, CompressionAlgorithm::Lz4, 3, 1024)
528                .unwrap();
529
530        let mut compressed = Vec::new();
531        stream.read_to_end(&mut compressed).await.unwrap();
532
533        assert!(compressed.len() < data.len());
534    }
535
536    #[tokio::test]
537    async fn test_invalid_compression_level() {
538        let data = b"test";
539        let cursor = std::io::Cursor::new(data);
540
541        let result = CompressingStream::new(cursor, CompressionAlgorithm::Zstd, 10);
542        assert!(result.is_err());
543    }
544
545    #[tokio::test]
546    async fn test_empty_stream() {
547        let data: Vec<u8> = vec![];
548        let cursor = std::io::Cursor::new(data);
549
550        let mut stream = CompressingStream::new(cursor, CompressionAlgorithm::Zstd, 3).unwrap();
551
552        let mut compressed = Vec::new();
553        stream.read_to_end(&mut compressed).await.unwrap();
554
555        let stats = stream.stats();
556        assert_eq!(stats.bytes_read, 0);
557        assert_eq!(stats.bytes_written, 0);
558    }
559
560    #[tokio::test]
561    async fn test_large_data_streaming() {
562        // Test with 1MB of repetitive data
563        let data = vec![42u8; 1024 * 1024];
564        let cursor = std::io::Cursor::new(data.clone());
565
566        let mut stream = CompressingStream::new(cursor, CompressionAlgorithm::Zstd, 9).unwrap();
567
568        let mut compressed = Vec::new();
569        stream.read_to_end(&mut compressed).await.unwrap();
570
571        // Should compress very well due to repetitive data
572        assert!(compressed.len() < data.len() / 10);
573
574        let stats = stream.stats();
575        assert_eq!(stats.bytes_read, 1024 * 1024);
576        assert!(stats.compression_ratio() < 0.1);
577    }
578
579    #[tokio::test]
580    async fn test_decompression_stats() {
581        let original = vec![1u8; 5000];
582
583        // Compress
584        let cursor = std::io::Cursor::new(original.clone());
585        let mut compressor = CompressingStream::new(cursor, CompressionAlgorithm::Lz4, 5).unwrap();
586
587        let mut compressed = Vec::new();
588        compressor.read_to_end(&mut compressed).await.unwrap();
589
590        // Decompress
591        let cursor = std::io::Cursor::new(compressed.clone());
592        let mut decompressor = DecompressingStream::new(cursor, CompressionAlgorithm::Lz4).unwrap();
593
594        let mut decompressed = Vec::new();
595        decompressor.read_to_end(&mut decompressed).await.unwrap();
596
597        let stats = decompressor.stats();
598        assert_eq!(stats.bytes_read, compressed.len() as u64);
599        assert_eq!(stats.bytes_written, original.len() as u64);
600        assert!(stats.compression_ratio() > 1.0); // Decompression expands
601    }
602}