1use crate::{CompressionAlgorithm, CompressionError, Result};
32use bytes::{Bytes, BytesMut};
33
34#[cfg(feature = "gzip")]
35use flate2::Compression as GzipCompression;
36#[cfg(feature = "gzip")]
37use flate2::write::GzEncoder;
38
39#[cfg(feature = "brotli")]
40use brotli::CompressorWriter as BrotliEncoder;
41
42#[cfg(feature = "zstd")]
43use zstd::stream::write::Encoder as ZstdEncoder;
44
45use std::io::Write;
46
47#[derive(Debug, Clone)]
49pub struct StreamingConfig {
50 pub algorithm: CompressionAlgorithm,
52 pub level: u32,
54 pub flush_interval: usize,
56 pub min_chunk_size: usize,
58 pub buffer_size: usize,
60}
61
62impl Default for StreamingConfig {
63 fn default() -> Self {
64 Self {
65 algorithm: CompressionAlgorithm::Auto,
66 level: 6,
67 flush_interval: 4096,
68 min_chunk_size: 64,
69 buffer_size: 8192,
70 }
71 }
72}
73
74impl StreamingConfig {
75 pub fn new() -> Self {
77 Self::default()
78 }
79
80 pub fn algorithm(mut self, algorithm: CompressionAlgorithm) -> Self {
82 self.algorithm = algorithm;
83 self
84 }
85
86 pub fn level(mut self, level: u32) -> Self {
88 self.level = level;
89 self
90 }
91
92 pub fn flush_interval(mut self, interval: usize) -> Self {
94 self.flush_interval = interval;
95 self
96 }
97
98 pub fn min_chunk_size(mut self, size: usize) -> Self {
100 self.min_chunk_size = size;
101 self
102 }
103
104 pub fn buffer_size(mut self, size: usize) -> Self {
106 self.buffer_size = size;
107 self
108 }
109
110 pub fn low_latency() -> Self {
112 Self {
113 algorithm: CompressionAlgorithm::Auto,
114 level: 1, flush_interval: 256, min_chunk_size: 16,
117 buffer_size: 1024,
118 }
119 }
120
121 pub fn high_compression() -> Self {
123 Self {
124 algorithm: CompressionAlgorithm::Auto,
125 level: 9, flush_interval: 16384, min_chunk_size: 1024,
128 buffer_size: 32768,
129 }
130 }
131}
132
133#[allow(clippy::large_enum_variant)] enum EncoderState {
136 None,
137 #[cfg(feature = "gzip")]
138 Gzip(GzEncoder<Vec<u8>>),
139 #[cfg(feature = "brotli")]
140 Brotli(BrotliEncoder<Vec<u8>>),
141 #[cfg(feature = "zstd")]
142 Zstd(ZstdEncoder<'static, Vec<u8>>),
143}
144
145pub struct StreamingCompressor {
147 config: StreamingConfig,
148 encoder: EncoderState,
149 bytes_in: u64,
150 bytes_out: u64,
151 unflushed_bytes: usize,
152 finished: bool,
153}
154
155impl StreamingCompressor {
156 pub fn new(config: StreamingConfig) -> Result<Self> {
158 let encoder = Self::create_encoder(&config)?;
159
160 Ok(Self {
161 config,
162 encoder,
163 bytes_in: 0,
164 bytes_out: 0,
165 unflushed_bytes: 0,
166 finished: false,
167 })
168 }
169
170 pub fn from_accept_encoding(accept_encoding: &str, config: StreamingConfig) -> Result<Self> {
172 let algorithm = CompressionAlgorithm::select_from_accept_encoding(accept_encoding);
173 let config = StreamingConfig {
174 algorithm,
175 ..config
176 };
177 Self::new(config)
178 }
179
180 fn create_encoder(config: &StreamingConfig) -> Result<EncoderState> {
181 let algorithm = match config.algorithm {
182 CompressionAlgorithm::Auto => {
183 #[cfg(feature = "gzip")]
185 {
186 CompressionAlgorithm::Gzip
187 }
188 #[cfg(not(feature = "gzip"))]
189 {
190 CompressionAlgorithm::None
191 }
192 }
193 other => other,
194 };
195
196 match algorithm {
197 CompressionAlgorithm::None | CompressionAlgorithm::Auto => Ok(EncoderState::None),
198
199 #[cfg(feature = "gzip")]
200 CompressionAlgorithm::Gzip => {
201 let level = config.level.clamp(1, 9);
202 let encoder = GzEncoder::new(
203 Vec::with_capacity(config.buffer_size),
204 GzipCompression::new(level),
205 );
206 Ok(EncoderState::Gzip(encoder))
207 }
208
209 #[cfg(feature = "brotli")]
210 CompressionAlgorithm::Brotli => {
211 let level = config.level.clamp(0, 11);
212 let encoder = BrotliEncoder::new(
213 Vec::with_capacity(config.buffer_size),
214 config.buffer_size,
215 level,
216 22, );
218 Ok(EncoderState::Brotli(encoder))
219 }
220
221 #[cfg(feature = "zstd")]
222 CompressionAlgorithm::Zstd => {
223 let level = config.level.clamp(1, 22) as i32;
224 let encoder = ZstdEncoder::new(Vec::with_capacity(config.buffer_size), level)
225 .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
226 Ok(EncoderState::Zstd(encoder))
227 }
228
229 #[allow(unreachable_patterns)]
230 _ => Err(CompressionError::UnsupportedAlgorithm(format!(
231 "{:?} not available",
232 algorithm
233 ))),
234 }
235 }
236
237 pub fn encoding(&self) -> Option<&'static str> {
239 match &self.encoder {
240 EncoderState::None => None,
241 #[cfg(feature = "gzip")]
242 EncoderState::Gzip(_) => Some("gzip"),
243 #[cfg(feature = "brotli")]
244 EncoderState::Brotli(_) => Some("br"),
245 #[cfg(feature = "zstd")]
246 EncoderState::Zstd(_) => Some("zstd"),
247 }
248 }
249
250 pub fn compress_chunk(&mut self, data: &[u8]) -> Result<Bytes> {
254 if self.finished {
255 return Err(CompressionError::CompressionFailed(
256 "Compressor already finished".to_string(),
257 ));
258 }
259
260 if data.is_empty() {
261 return Ok(Bytes::new());
262 }
263
264 self.bytes_in += data.len() as u64;
265 self.unflushed_bytes += data.len();
266
267 match &mut self.encoder {
268 EncoderState::None => {
269 self.bytes_out += data.len() as u64;
271 Ok(Bytes::copy_from_slice(data))
272 }
273
274 #[cfg(feature = "gzip")]
275 EncoderState::Gzip(encoder) => {
276 encoder
277 .write_all(data)
278 .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
279
280 if self.unflushed_bytes >= self.config.flush_interval {
282 self.flush_internal()
283 } else {
284 Ok(Bytes::new())
285 }
286 }
287
288 #[cfg(feature = "brotli")]
289 EncoderState::Brotli(encoder) => {
290 encoder
291 .write_all(data)
292 .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
293
294 if self.unflushed_bytes >= self.config.flush_interval {
295 self.flush_internal()
296 } else {
297 Ok(Bytes::new())
298 }
299 }
300
301 #[cfg(feature = "zstd")]
302 EncoderState::Zstd(encoder) => {
303 encoder
304 .write_all(data)
305 .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
306
307 if self.unflushed_bytes >= self.config.flush_interval {
308 self.flush_internal()
309 } else {
310 Ok(Bytes::new())
311 }
312 }
313 }
314 }
315
316 pub fn flush(&mut self) -> Result<Bytes> {
318 self.flush_internal()
319 }
320
321 fn flush_internal(&mut self) -> Result<Bytes> {
322 self.unflushed_bytes = 0;
323
324 match &mut self.encoder {
325 EncoderState::None => Ok(Bytes::new()),
326
327 #[cfg(feature = "gzip")]
328 EncoderState::Gzip(encoder) => {
329 encoder
330 .flush()
331 .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
332 let inner = encoder.get_mut();
333 if inner.is_empty() {
334 return Ok(Bytes::new());
335 }
336 let output = std::mem::take(inner);
337 self.bytes_out += output.len() as u64;
338 Ok(Bytes::from(output))
339 }
340
341 #[cfg(feature = "brotli")]
342 EncoderState::Brotli(encoder) => {
343 encoder
344 .flush()
345 .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
346 let inner = encoder.get_mut();
347 if inner.is_empty() {
348 return Ok(Bytes::new());
349 }
350 let output = std::mem::take(inner);
351 self.bytes_out += output.len() as u64;
352 Ok(Bytes::from(output))
353 }
354
355 #[cfg(feature = "zstd")]
356 EncoderState::Zstd(encoder) => {
357 encoder
358 .flush()
359 .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
360 let inner = encoder.get_mut();
361 if inner.is_empty() {
362 return Ok(Bytes::new());
363 }
364 let output = std::mem::take(inner);
365 self.bytes_out += output.len() as u64;
366 Ok(Bytes::from(output))
367 }
368 }
369 }
370
371 pub fn finish(mut self) -> Result<Bytes> {
373 if self.finished {
374 return Ok(Bytes::new());
375 }
376 self.finished = true;
377
378 match self.encoder {
379 EncoderState::None => Ok(Bytes::new()),
380
381 #[cfg(feature = "gzip")]
382 EncoderState::Gzip(encoder) => {
383 let output = encoder
384 .finish()
385 .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
386 self.bytes_out += output.len() as u64;
387 Ok(Bytes::from(output))
388 }
389
390 #[cfg(feature = "brotli")]
391 EncoderState::Brotli(mut encoder) => {
392 encoder
393 .flush()
394 .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
395 let output = encoder.into_inner();
396 self.bytes_out += output.len() as u64;
397 Ok(Bytes::from(output))
398 }
399
400 #[cfg(feature = "zstd")]
401 EncoderState::Zstd(encoder) => {
402 let output = encoder
403 .finish()
404 .map_err(|e| CompressionError::CompressionFailed(e.to_string()))?;
405 self.bytes_out += output.len() as u64;
406 Ok(Bytes::from(output))
407 }
408 }
409 }
410
411 pub fn stats(&self) -> CompressionStats {
413 CompressionStats {
414 bytes_in: self.bytes_in,
415 bytes_out: self.bytes_out,
416 ratio: if self.bytes_in > 0 {
417 self.bytes_out as f64 / self.bytes_in as f64
418 } else {
419 1.0
420 },
421 }
422 }
423}
424
425#[derive(Debug, Clone, Copy)]
427pub struct CompressionStats {
428 pub bytes_in: u64,
430 pub bytes_out: u64,
432 pub ratio: f64,
434}
435
436impl CompressionStats {
437 pub fn savings_percent(&self) -> f64 {
439 if self.bytes_in == 0 {
440 return 0.0;
441 }
442 (1.0 - self.ratio) * 100.0
443 }
444}
445
446pub struct AsyncStreamingCompressor {
450 inner: StreamingCompressor,
451 #[allow(dead_code)] pending: BytesMut,
453}
454
455impl AsyncStreamingCompressor {
456 pub fn new(config: StreamingConfig) -> Result<Self> {
458 Ok(Self {
459 inner: StreamingCompressor::new(config)?,
460 pending: BytesMut::with_capacity(8192),
461 })
462 }
463
464 pub fn encoding(&self) -> Option<&'static str> {
466 self.inner.encoding()
467 }
468
469 pub async fn process(&mut self, chunk: Bytes) -> Result<Bytes> {
471 self.inner.compress_chunk(&chunk)
472 }
473
474 pub async fn flush(&mut self) -> Result<Bytes> {
476 self.inner.flush()
477 }
478
479 pub fn finish(self) -> Result<Bytes> {
481 self.inner.finish()
482 }
483
484 pub fn stats(&self) -> CompressionStats {
486 self.inner.stats()
487 }
488}
489
490#[cfg(test)]
491mod tests {
492 use super::*;
493
494 #[test]
495 fn test_streaming_config() {
496 let config = StreamingConfig::new()
497 .algorithm(CompressionAlgorithm::None)
498 .level(6)
499 .flush_interval(1024);
500
501 assert_eq!(config.level, 6);
502 assert_eq!(config.flush_interval, 1024);
503 }
504
505 #[test]
506 fn test_passthrough_compressor() {
507 let config = StreamingConfig::new().algorithm(CompressionAlgorithm::None);
508
509 let mut compressor = StreamingCompressor::new(config).unwrap();
510
511 let data = b"Hello, World!";
512 let compressed = compressor.compress_chunk(data).unwrap();
513
514 assert_eq!(compressed.as_ref(), data);
515
516 let final_chunk = compressor.finish().unwrap();
517 assert!(final_chunk.is_empty());
518 }
519
520 #[test]
521 #[cfg(feature = "gzip")]
522 fn test_gzip_streaming() {
523 let config = StreamingConfig::new()
524 .algorithm(CompressionAlgorithm::Gzip)
525 .flush_interval(10); let mut compressor = StreamingCompressor::new(config).unwrap();
528
529 let mut total_compressed = BytesMut::new();
530
531 for _ in 0..10 {
533 let data = b"Hello, World! This is a test chunk.\n";
534 let compressed = compressor.compress_chunk(data).unwrap();
535 total_compressed.extend_from_slice(&compressed);
536 }
537
538 let stats = compressor.stats();
540 assert_eq!(stats.bytes_in, 10 * 36);
541
542 let final_chunk = compressor.finish().unwrap();
544 total_compressed.extend_from_slice(&final_chunk);
545
546 let original_size = 10 * 36; assert!(total_compressed.len() < original_size);
549 }
550
551 #[test]
552 fn test_compression_stats() {
553 let stats = CompressionStats {
554 bytes_in: 1000,
555 bytes_out: 400,
556 ratio: 0.4,
557 };
558
559 assert_eq!(stats.savings_percent(), 60.0);
560 }
561
562 #[test]
563 fn test_low_latency_config() {
564 let config = StreamingConfig::low_latency();
565 assert_eq!(config.level, 1);
566 assert_eq!(config.flush_interval, 256);
567 }
568
569 #[test]
570 fn test_high_compression_config() {
571 let config = StreamingConfig::high_compression();
572 assert_eq!(config.level, 9);
573 assert!(config.flush_interval > 1024);
574 }
575}