armature_compression/
streaming.rs

1//! Streaming Compression - Compress chunks as they're generated
2//!
3//! This module provides streaming compression that compresses data incrementally
4//! as it's produced, rather than buffering the entire response.
5//!
6//! # Features
7//!
8//! - Incremental compression for streaming responses
9//! - Multiple algorithm support (gzip, brotli, zstd)
10//! - Configurable flush intervals
11//! - Integration with SSE and chunked responses
12//!
13//! # Example
14//!
15//! ```rust,ignore
16//! use armature_compression::streaming::{StreamingCompressor, StreamingConfig};
17//!
18//! let config = StreamingConfig::new()
19//!     .algorithm(CompressionAlgorithm::Gzip)
20//!     .flush_interval(1024);
21//!
22//! let mut compressor = StreamingCompressor::new(config)?;
23//!
24//! // Compress chunks as they arrive
25//! let compressed = compressor.compress_chunk(data)?;
26//!
27//! // Finish compression
28//! let final_chunk = compressor.finish()?;
29//! ```
30
31use crate::{CompressionAlgorithm, CompressionError, Result};
32use bytes::{Bytes, BytesMut};
33
34#[cfg(feature = "gzip")]
35use flate2::Compression as GzipCompression;
36#[cfg(feature = "gzip")]
37use flate2::write::GzEncoder;
38
39#[cfg(feature = "brotli")]
40use brotli::CompressorWriter as BrotliEncoder;
41
42#[cfg(feature = "zstd")]
43use zstd::stream::write::Encoder as ZstdEncoder;
44
45use std::io::Write;
46
47/// Configuration for streaming compression.
48#[derive(Debug, Clone)]
49pub struct StreamingConfig {
50    /// Compression algorithm to use
51    pub algorithm: CompressionAlgorithm,
52    /// Compression level (algorithm-specific)
53    pub level: u32,
54    /// Flush after this many bytes (0 = auto)
55    pub flush_interval: usize,
56    /// Minimum chunk size before compressing
57    pub min_chunk_size: usize,
58    /// Buffer size for compression output
59    pub buffer_size: usize,
60}
61
62impl Default for StreamingConfig {
63    fn default() -> Self {
64        Self {
65            algorithm: CompressionAlgorithm::Auto,
66            level: 6,
67            flush_interval: 4096,
68            min_chunk_size: 64,
69            buffer_size: 8192,
70        }
71    }
72}
73
74impl StreamingConfig {
75    /// Create new streaming config.
76    pub fn new() -> Self {
77        Self::default()
78    }
79
80    /// Set compression algorithm.
81    pub fn algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
82        self.algorithm = algorithm;
83        self
84    }
85
86    /// Set compression level.
87    pub fn level(mut self, level: u32) -> Self {
88        self.level = level;
89        self
90    }
91
92    /// Set flush interval (bytes).
93    pub fn flush_interval(mut self, interval: usize) -> Self {
94        self.flush_interval = interval;
95        self
96    }
97
98    /// Set minimum chunk size.
99    pub fn min_chunk_size(mut self, size: usize) -> Self {
100        self.min_chunk_size = size;
101        self
102    }
103
104    /// Set buffer size.
105    pub fn buffer_size(mut self, size: usize) -> Self {
106        self.buffer_size = size;
107        self
108    }
109
110    /// Create config optimized for low latency streaming.
111    pub fn low_latency() -> Self {
112        Self {
113            algorithm: CompressionAlgorithm::Auto,
114            level: 1,            // Fastest
115            flush_interval: 256, // Frequent flushes
116            min_chunk_size: 16,
117            buffer_size: 1024,
118        }
119    }
120
121    /// Create config optimized for high compression ratio.
122    pub fn high_compression() -> Self {
123        Self {
124            algorithm: CompressionAlgorithm::Auto,
125            level: 9,              // Best compression
126            flush_interval: 16384, // Less frequent flushes
127            min_chunk_size: 1024,
128            buffer_size: 32768,
129        }
130    }
131}
132
133/// Internal encoder state.
134#[allow(clippy::large_enum_variant)] // Boxing adds complexity, variant is stack-allocated once
135enum EncoderState {
136    None,
137    #[cfg(feature = "gzip")]
138    Gzip(GzEncoder<Vec<u8>>),
139    #[cfg(feature = "brotli")]
140    Brotli(BrotliEncoder<Vec<u8>>),
141    #[cfg(feature = "zstd")]
142    Zstd(ZstdEncoder<'static, Vec<u8>>),
143}
144
145/// Streaming compressor that compresses chunks incrementally.
146pub struct StreamingCompressor {
147    config: StreamingConfig,
148    encoder: EncoderState,
149    bytes_in: u64,
150    bytes_out: u64,
151    unflushed_bytes: usize,
152    finished: bool,
153}
154
155impl StreamingCompressor {
156    /// Create a new streaming compressor.
157    pub fn new(config: StreamingConfig) -> Result<Self> {
158        let encoder = Self::create_encoder(&config)?;
159
160        Ok(Self {
161            config,
162            encoder,
163            bytes_in: 0,
164            bytes_out: 0,
165            unflushed_bytes: 0,
166            finished: false,
167        })
168    }
169
170    /// Create with specific algorithm selected from Accept-Encoding.
171    pub fn from_accept_encoding(accept_encoding: &str, config: StreamingConfig) -> Result<Self> {
172        let algorithm = CompressionAlgorithm::select_from_accept_encoding(accept_encoding);
173        let config = StreamingConfig {
174            algorithm,
175            ..config
176        };
177        Self::new(config)
178    }
179
180    fn create_encoder(config: &StreamingConfig) -> Result<EncoderState> {
181        let algorithm = match config.algorithm {
182            CompressionAlgorithm::Auto => {
183                // Default to gzip for streaming (best compatibility)
184                #[cfg(feature = "gzip")]
185                {
186                    CompressionAlgorithm::Gzip
187                }
188                #[cfg(not(feature = "gzip"))]
189                {
190                    CompressionAlgorithm::None
191                }
192            }
193            other => other,
194        };
195
196        match algorithm {
197            CompressionAlgorithm::None | CompressionAlgorithm::Auto => Ok(EncoderState::None),
198
199            #[cfg(feature = "gzip")]
200            CompressionAlgorithm::Gzip => {
201                let level = config.level.clamp(1, 9);
202                let encoder = GzEncoder::new(
203                    Vec::with_capacity(config.buffer_size),
204                    GzipCompression::new(level),
205                );
206                Ok(EncoderState::Gzip(encoder))
207            }
208
209            #[cfg(feature = "brotli")]
210            CompressionAlgorithm::Brotli => {
211                let level = config.level.clamp(0, 11);
212                let encoder = BrotliEncoder::new(
213                    Vec::with_capacity(config.buffer_size),
214                    config.buffer_size,
215                    level,
216                    22, // lgwin
217                );
218                Ok(EncoderState::Brotli(encoder))
219            }
220
221            #[cfg(feature = "zstd")]
222            CompressionAlgorithm::Zstd => {
223                let level = config.level.clamp(1, 22) as i32;
224                let encoder = ZstdEncoder::new(Vec::with_capacity(config.buffer_size), level)
225                    .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
226                Ok(EncoderState::Zstd(encoder))
227            }
228
229            #[allow(unreachable_patterns)]
230            _ => Err(CompressionError::UnsupportedAlgorithm(format!(
231                "{:?} not available",
232                algorithm
233            ))),
234        }
235    }
236
237    /// Get the Content-Encoding value for this compressor.
238    pub fn encoding(&self) -> Option<&'static str> {
239        match &self.encoder {
240            EncoderState::None => None,
241            #[cfg(feature = "gzip")]
242            EncoderState::Gzip(_) => Some("gzip"),
243            #[cfg(feature = "brotli")]
244            EncoderState::Brotli(_) => Some("br"),
245            #[cfg(feature = "zstd")]
246            EncoderState::Zstd(_) => Some("zstd"),
247        }
248    }
249
250    /// Compress a chunk of data.
251    ///
252    /// Returns compressed bytes. May return empty if data is being buffered.
253    pub fn compress_chunk(&mut self, data: &[u8]) -> Result<Bytes> {
254        if self.finished {
255            return Err(CompressionError::CompressionFailed(
256                "Compressor already finished".to_string(),
257            ));
258        }
259
260        if data.is_empty() {
261            return Ok(Bytes::new());
262        }
263
264        self.bytes_in += data.len() as u64;
265        self.unflushed_bytes += data.len();
266
267        match &mut self.encoder {
268            EncoderState::None => {
269                // Pass through
270                self.bytes_out += data.len() as u64;
271                Ok(Bytes::copy_from_slice(data))
272            }
273
274            #[cfg(feature = "gzip")]
275            EncoderState::Gzip(encoder) => {
276                encoder
277                    .write_all(data)
278                    .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
279
280                // Flush if we've accumulated enough
281                if self.unflushed_bytes >= self.config.flush_interval {
282                    self.flush_internal()
283                } else {
284                    Ok(Bytes::new())
285                }
286            }
287
288            #[cfg(feature = "brotli")]
289            EncoderState::Brotli(encoder) => {
290                encoder
291                    .write_all(data)
292                    .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
293
294                if self.unflushed_bytes >= self.config.flush_interval {
295                    self.flush_internal()
296                } else {
297                    Ok(Bytes::new())
298                }
299            }
300
301            #[cfg(feature = "zstd")]
302            EncoderState::Zstd(encoder) => {
303                encoder
304                    .write_all(data)
305                    .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
306
307                if self.unflushed_bytes >= self.config.flush_interval {
308                    self.flush_internal()
309                } else {
310                    Ok(Bytes::new())
311                }
312            }
313        }
314    }
315
316    /// Flush compressed data without finishing.
317    pub fn flush(&mut self) -> Result<Bytes> {
318        self.flush_internal()
319    }
320
321    fn flush_internal(&mut self) -> Result<Bytes> {
322        self.unflushed_bytes = 0;
323
324        match &mut self.encoder {
325            EncoderState::None => Ok(Bytes::new()),
326
327            #[cfg(feature = "gzip")]
328            EncoderState::Gzip(encoder) => {
329                encoder
330                    .flush()
331                    .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
332                let inner = encoder.get_mut();
333                if inner.is_empty() {
334                    return Ok(Bytes::new());
335                }
336                let output = std::mem::take(inner);
337                self.bytes_out += output.len() as u64;
338                Ok(Bytes::from(output))
339            }
340
341            #[cfg(feature = "brotli")]
342            EncoderState::Brotli(encoder) => {
343                encoder
344                    .flush()
345                    .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
346                let inner = encoder.get_mut();
347                if inner.is_empty() {
348                    return Ok(Bytes::new());
349                }
350                let output = std::mem::take(inner);
351                self.bytes_out += output.len() as u64;
352                Ok(Bytes::from(output))
353            }
354
355            #[cfg(feature = "zstd")]
356            EncoderState::Zstd(encoder) => {
357                encoder
358                    .flush()
359                    .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
360                let inner = encoder.get_mut();
361                if inner.is_empty() {
362                    return Ok(Bytes::new());
363                }
364                let output = std::mem::take(inner);
365                self.bytes_out += output.len() as u64;
366                Ok(Bytes::from(output))
367            }
368        }
369    }
370
371    /// Finish compression and return final bytes.
372    pub fn finish(mut self) -> Result<Bytes> {
373        if self.finished {
374            return Ok(Bytes::new());
375        }
376        self.finished = true;
377
378        match self.encoder {
379            EncoderState::None => Ok(Bytes::new()),
380
381            #[cfg(feature = "gzip")]
382            EncoderState::Gzip(encoder) => {
383                let output = encoder
384                    .finish()
385                    .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
386                self.bytes_out += output.len() as u64;
387                Ok(Bytes::from(output))
388            }
389
390            #[cfg(feature = "brotli")]
391            EncoderState::Brotli(mut encoder) => {
392                encoder
393                    .flush()
394                    .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
395                let output = encoder.into_inner();
396                self.bytes_out += output.len() as u64;
397                Ok(Bytes::from(output))
398            }
399
400            #[cfg(feature = "zstd")]
401            EncoderState::Zstd(encoder) => {
402                let output = encoder
403                    .finish()
404                    .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
405                self.bytes_out += output.len() as u64;
406                Ok(Bytes::from(output))
407            }
408        }
409    }
410
411    /// Get compression statistics.
412    pub fn stats(&self) -> CompressionStats {
413        CompressionStats {
414            bytes_in: self.bytes_in,
415            bytes_out: self.bytes_out,
416            ratio: if self.bytes_in > 0 {
417                self.bytes_out as f64 / self.bytes_in as f64
418            } else {
419                1.0
420            },
421        }
422    }
423}
424
425/// Compression statistics.
426#[derive(Debug, Clone, Copy)]
427pub struct CompressionStats {
428    /// Total bytes before compression
429    pub bytes_in: u64,
430    /// Total bytes after compression
431    pub bytes_out: u64,
432    /// Compression ratio (out/in, lower is better)
433    pub ratio: f64,
434}
435
436impl CompressionStats {
437    /// Get space savings as percentage (0-100).
438    pub fn savings_percent(&self) -> f64 {
439        if self.bytes_in == 0 {
440            return 0.0;
441        }
442        (1.0 - self.ratio) * 100.0
443    }
444}
445
446/// Async streaming compressor wrapper.
447///
448/// Wraps a StreamingCompressor for use with async streams.
449pub struct AsyncStreamingCompressor {
450    inner: StreamingCompressor,
451    #[allow(dead_code)] // Reserved for buffering partial data
452    pending: BytesMut,
453}
454
455impl AsyncStreamingCompressor {
456    /// Create a new async streaming compressor.
457    pub fn new(config: StreamingConfig) -> Result<Self> {
458        Ok(Self {
459            inner: StreamingCompressor::new(config)?,
460            pending: BytesMut::with_capacity(8192),
461        })
462    }
463
464    /// Get the Content-Encoding header value.
465    pub fn encoding(&self) -> Option<&'static str> {
466        self.inner.encoding()
467    }
468
469    /// Process a chunk and return compressed data.
470    pub async fn process(&mut self, chunk: Bytes) -> Result<Bytes> {
471        self.inner.compress_chunk(&chunk)
472    }
473
474    /// Flush any pending compressed data.
475    pub async fn flush(&mut self) -> Result<Bytes> {
476        self.inner.flush()
477    }
478
479    /// Finish compression.
480    pub fn finish(self) -> Result<Bytes> {
481        self.inner.finish()
482    }
483
484    /// Get compression statistics.
485    pub fn stats(&self) -> CompressionStats {
486        self.inner.stats()
487    }
488}
489
490#[cfg(test)]
491mod tests {
492    use super::*;
493
494    #[test]
495    fn test_streaming_config() {
496        let config = StreamingConfig::new()
497            .algorithm(CompressionAlgorithm::None)
498            .level(6)
499            .flush_interval(1024);
500
501        assert_eq!(config.level, 6);
502        assert_eq!(config.flush_interval, 1024);
503    }
504
505    #[test]
506    fn test_passthrough_compressor() {
507        let config = StreamingConfig::new().algorithm(CompressionAlgorithm::None);
508
509        let mut compressor = StreamingCompressor::new(config).unwrap();
510
511        let data = b"Hello, World!";
512        let compressed = compressor.compress_chunk(data).unwrap();
513
514        assert_eq!(compressed.as_ref(), data);
515
516        let final_chunk = compressor.finish().unwrap();
517        assert!(final_chunk.is_empty());
518    }
519
520    #[test]
521    #[cfg(feature = "gzip")]
522    fn test_gzip_streaming() {
523        let config = StreamingConfig::new()
524            .algorithm(CompressionAlgorithm::Gzip)
525            .flush_interval(10); // Flush frequently for test
526
527        let mut compressor = StreamingCompressor::new(config).unwrap();
528
529        let mut total_compressed = BytesMut::new();
530
531        // Send multiple chunks
532        for _ in 0..10 {
533            let data = b"Hello, World! This is a test chunk.\n";
534            let compressed = compressor.compress_chunk(data).unwrap();
535            total_compressed.extend_from_slice(&compressed);
536        }
537
538        // Get stats before finishing (finish consumes self)
539        let stats = compressor.stats();
540        assert_eq!(stats.bytes_in, 10 * 36);
541
542        // Finish
543        let final_chunk = compressor.finish().unwrap();
544        total_compressed.extend_from_slice(&final_chunk);
545
546        // Should be smaller than original
547        let original_size = 10 * 36; // 10 chunks * 36 bytes each
548        assert!(total_compressed.len() < original_size);
549    }
550
551    #[test]
552    fn test_compression_stats() {
553        let stats = CompressionStats {
554            bytes_in: 1000,
555            bytes_out: 400,
556            ratio: 0.4,
557        };
558
559        assert_eq!(stats.savings_percent(), 60.0);
560    }
561
562    #[test]
563    fn test_low_latency_config() {
564        let config = StreamingConfig::low_latency();
565        assert_eq!(config.level, 1);
566        assert_eq!(config.flush_interval, 256);
567    }
568
569    #[test]
570    fn test_high_compression_config() {
571        let config = StreamingConfig::high_compression();
572        assert_eq!(config.level, 9);
573        assert!(config.flush_interval > 1024);
574    }
575}