tako/plugins/compression/
deflate_stream.rs

1//! DEFLATE compression streaming utilities for efficient HTTP response compression.
2//!
3//! This module provides streaming DEFLATE compression for HTTP response bodies using the
4//! flate2 crate. DEFLATE compression offers good compression ratios with fast processing
5//! speeds, making it suitable for real-time web content compression. The streaming
6//! implementation enables memory-efficient compression of large responses without
7//! buffering entire content in memory.
8//!
9//! # Examples
10//!
11//! ```rust
12//! use tako::plugins::compression::deflate_stream::stream_deflate;
13//! use http_body_util::Full;
14//! use bytes::Bytes;
15//!
16//! // Compress a response body with DEFLATE level 6
17//! let body = Full::from(Bytes::from("Hello, World! This is test content."));
18//! let compressed = stream_deflate(body, 6);
19//!
20//! // Fast compression for dynamic content
21//! let dynamic_content = Full::from(Bytes::from("API response data..."));
22//! let fast_compressed = stream_deflate(dynamic_content, 1);
23//! ```
24
25use std::{
26  io::Write,
27  pin::Pin,
28  task::{Context, Poll},
29};
30
31use anyhow::Result;
32use bytes::Bytes;
33use flate2::{Compression, write::DeflateEncoder};
34use futures_util::{Stream, TryStreamExt};
35use http_body::{Body, Frame};
36use http_body_util::BodyExt;
37use pin_project_lite::pin_project;
38
39use crate::{body::TakoBody, types::BoxError};
40
41/// Compresses an HTTP body stream using the DEFLATE compression algorithm.
42pub fn stream_deflate<B>(body: B, level: u32) -> TakoBody
43where
44  B: Body<Data = Bytes, Error = BoxError> + Send + 'static,
45{
46  let upstream = body.into_data_stream();
47  let deflate = DeflateStream::new(upstream, level).map_ok(Frame::data);
48  TakoBody::from_try_stream(deflate)
49}
50
51pin_project! {
52    /// Streaming DEFLATE compressor that wraps an inner data stream.
53    pub struct DeflateStream<S> {
54        #[pin] inner: S,
55        encoder: DeflateEncoder<Vec<u8>>,
56        pos: usize,
57        done: bool,
58    }
59}
60
61impl<S> DeflateStream<S> {
62  /// Creates a new DEFLATE compression stream with the specified compression level.
63  pub fn new(inner: S, level: u32) -> Self {
64    Self {
65      inner,
66      encoder: DeflateEncoder::new(Vec::new(), Compression::new(level)),
67      pos: 0,
68      done: false,
69    }
70  }
71}
72
73impl<S> Stream for DeflateStream<S>
74where
75  S: Stream<Item = Result<Bytes, BoxError>>,
76{
77  type Item = Result<Bytes, BoxError>;
78
79  /// Polls the stream for the next compressed data chunk.
80  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81    let mut this = self.project();
82
83    loop {
84      // If there is data in the encoder's buffer, return it.
85      if *this.pos < this.encoder.get_ref().len() {
86        let buf = &this.encoder.get_ref()[*this.pos..];
87        *this.pos = this.encoder.get_ref().len();
88        return Poll::Ready(Some(Ok(Bytes::copy_from_slice(buf))));
89      }
90
91      // If the stream is done, return None to indicate completion.
92      if *this.done {
93        return Poll::Ready(None);
94      }
95
96      // Poll the inner stream for the next chunk of data.
97      match this.inner.as_mut().poll_next(cx) {
98        Poll::Ready(Some(Ok(chunk))) => {
99          // Compress the chunk and flush the encoder.
100          if let Err(e) = this
101            .encoder
102            .write_all(&chunk)
103            .and_then(|_| this.encoder.flush())
104          {
105            return Poll::Ready(Some(Err(e.into())));
106          }
107          continue;
108        }
109        Poll::Ready(Some(Err(e))) => {
110          // Propagate errors from the inner stream.
111          return Poll::Ready(Some(Err(e)));
112        }
113        Poll::Ready(None) => {
114          // Finalize the compression when the inner stream is finished.
115          *this.done = true;
116          if let Err(e) = this.encoder.try_finish() {
117            return Poll::Ready(Some(Err(e.into())));
118          }
119          continue;
120        }
121        Poll::Pending => {
122          // Indicate that the stream is not ready yet.
123          return Poll::Pending;
124        }
125      }
126    }
127  }
128}