tako/plugins/compression/
gzip_stream.rs

1//! Gzip compression streaming utilities for efficient HTTP response compression.
2//!
3//! This module provides streaming Gzip compression for HTTP response bodies using the
4//! flate2 crate. Gzip is one of the most widely supported compression formats on the web,
5//! offering excellent compatibility across all browsers and HTTP clients. The streaming
6//! implementation enables memory-efficient compression of large responses without
7//! buffering entire content in memory, making it ideal for real-time web applications.
8//!
9//! # Examples
10//!
11//! ```rust
12//! use tako::plugins::compression::gzip_stream::stream_gzip;
13//! use http_body_util::Full;
14//! use bytes::Bytes;
15//!
16//! // Compress a response body with Gzip level 6
17//! let body = Full::from(Bytes::from("Hello, World! This is test content."));
18//! let compressed = stream_gzip(body, 6);
19//!
20//! // Fast compression for dynamic API responses
21//! let api_response = Full::from(Bytes::from("JSON API data..."));
22//! let fast_compressed = stream_gzip(api_response, 1);
23//! ```
24
25use std::{
26  io::Write,
27  pin::Pin,
28  task::{Context, Poll},
29};
30
31use anyhow::Result;
32use bytes::Bytes;
33use flate2::{Compression, write::GzEncoder};
34use futures_util::{Stream, TryStreamExt};
35use http_body::{Body, Frame};
36use http_body_util::BodyExt;
37use pin_project_lite::pin_project;
38
39use crate::{body::TakoBody, types::BoxError};
40
41/// Compresses an HTTP body stream using Gzip compression algorithm.
42pub fn stream_gzip<B>(body: B, level: u32) -> TakoBody
43where
44  B: Body<Data = Bytes, Error = BoxError> + Send + 'static,
45{
46  let upstream = body.into_data_stream();
47  let gzip = GzipStream::new(upstream, level).map_ok(Frame::data);
48  TakoBody::from_try_stream(gzip)
49}
50
51pin_project! {
52    /// Streaming Gzip compressor that wraps an inner data stream.
53    pub struct GzipStream<S> {
54        #[pin] inner: S,
55        encoder: GzEncoder<Vec<u8>>,
56        pos: usize,
57        done: bool,
58    }
59}
60
61impl<S> GzipStream<S> {
62  /// Creates a new Gzip compression stream with the specified compression level.
63  fn new(stream: S, level: u32) -> Self {
64    Self {
65      inner: stream,
66      encoder: GzEncoder::new(Vec::new(), Compression::new(level)),
67      pos: 0,
68      done: false,
69    }
70  }
71}
72
73impl<S> Stream for GzipStream<S>
74where
75  S: Stream<Item = Result<Bytes, BoxError>>,
76{
77  type Item = Result<Bytes, BoxError>;
78
79  /// Polls the stream for the next compressed data chunk.
80  fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81    let mut this = self.project();
82
83    loop {
84      // 1) Do we still have unread bytes in the encoder buffer?
85      if *this.pos < this.encoder.get_ref().len() {
86        let buf = &this.encoder.get_ref()[*this.pos..];
87        *this.pos = this.encoder.get_ref().len();
88        // Immediately send the chunk and return Ready.
89        return Poll::Ready(Some(Ok(Bytes::copy_from_slice(buf))));
90      }
91      // 2) If we already finished and nothing is left, end the stream.
92      if *this.done {
93        return Poll::Ready(None);
94      }
95      // 3) Poll the inner stream for more input data.
96      match this.inner.as_mut().poll_next(cx) {
97        // New chunk arrived: compress it, then loop again
98        // (now the buffer certainly contains data).
99        Poll::Ready(Some(Ok(chunk))) => {
100          if let Err(e) = this
101            .encoder
102            .write_all(&chunk)
103            .and_then(|_| this.encoder.flush())
104          {
105            return Poll::Ready(Some(Err(e.into())));
106          }
107          continue;
108        }
109        // Error from the inner stream — propagate it.
110        Poll::Ready(Some(Err(e))) => {
111          return Poll::Ready(Some(Err(e)));
112        }
113        // Inner stream finished: finalize the encoder,
114        // then loop to drain the remaining bytes.
115        Poll::Ready(None) => {
116          *this.done = true;
117          if let Err(e) = this.encoder.flush() {
118            return Poll::Ready(Some(Err(e.into())));
119          }
120          continue;
121        }
122        // No new input and no buffered output: we must wait.
123        Poll::Pending => {
124          return Poll::Pending;
125        }
126      }
127    }
128  }
129}