tako/plugins/compression/gzip_stream.rs
1//! Gzip compression streaming utilities for efficient HTTP response compression.
2//!
3//! This module provides streaming Gzip compression for HTTP response bodies using the
4//! flate2 crate. Gzip is one of the most widely supported compression formats on the web,
5//! offering excellent compatibility across all browsers and HTTP clients. The streaming
6//! implementation enables memory-efficient compression of large responses without
7//! buffering entire content in memory, making it ideal for real-time web applications.
8//!
9//! # Examples
10//!
11//! ```rust
12//! use tako::plugins::compression::gzip_stream::stream_gzip;
13//! use http_body_util::Full;
14//! use bytes::Bytes;
15//!
16//! // Compress a response body with Gzip level 6
17//! let body = Full::from(Bytes::from("Hello, World! This is test content."));
18//! let compressed = stream_gzip(body, 6);
19//!
20//! // Fast compression for dynamic API responses
21//! let api_response = Full::from(Bytes::from("JSON API data..."));
22//! let fast_compressed = stream_gzip(api_response, 1);
23//! ```
24
25use std::{
26 io::Write,
27 pin::Pin,
28 task::{Context, Poll},
29};
30
31use anyhow::Result;
32use bytes::Bytes;
33use flate2::{Compression, write::GzEncoder};
34use futures_util::{Stream, TryStreamExt};
35use http_body::{Body, Frame};
36use http_body_util::BodyExt;
37use pin_project_lite::pin_project;
38
39use crate::{body::TakoBody, types::BoxError};
40
41/// Compresses an HTTP body stream using Gzip compression algorithm.
42pub fn stream_gzip<B>(body: B, level: u32) -> TakoBody
43where
44 B: Body<Data = Bytes, Error = BoxError> + Send + 'static,
45{
46 let upstream = body.into_data_stream();
47 let gzip = GzipStream::new(upstream, level).map_ok(Frame::data);
48 TakoBody::from_try_stream(gzip)
49}
50
51pin_project! {
52 /// Streaming Gzip compressor that wraps an inner data stream.
53 pub struct GzipStream<S> {
54 #[pin] inner: S,
55 encoder: GzEncoder<Vec<u8>>,
56 pos: usize,
57 done: bool,
58 }
59}
60
61impl<S> GzipStream<S> {
62 /// Creates a new Gzip compression stream with the specified compression level.
63 fn new(stream: S, level: u32) -> Self {
64 Self {
65 inner: stream,
66 encoder: GzEncoder::new(Vec::new(), Compression::new(level)),
67 pos: 0,
68 done: false,
69 }
70 }
71}
72
73impl<S> Stream for GzipStream<S>
74where
75 S: Stream<Item = Result<Bytes, BoxError>>,
76{
77 type Item = Result<Bytes, BoxError>;
78
79 /// Polls the stream for the next compressed data chunk.
80 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
81 let mut this = self.project();
82
83 loop {
84 // 1) Do we still have unread bytes in the encoder buffer?
85 if *this.pos < this.encoder.get_ref().len() {
86 let buf = &this.encoder.get_ref()[*this.pos..];
87 *this.pos = this.encoder.get_ref().len();
88 // Immediately send the chunk and return Ready.
89 return Poll::Ready(Some(Ok(Bytes::copy_from_slice(buf))));
90 }
91 // 2) If we already finished and nothing is left, end the stream.
92 if *this.done {
93 return Poll::Ready(None);
94 }
95 // 3) Poll the inner stream for more input data.
96 match this.inner.as_mut().poll_next(cx) {
97 // New chunk arrived: compress it, then loop again
98 // (now the buffer certainly contains data).
99 Poll::Ready(Some(Ok(chunk))) => {
100 if let Err(e) = this
101 .encoder
102 .write_all(&chunk)
103 .and_then(|_| this.encoder.flush())
104 {
105 return Poll::Ready(Some(Err(e.into())));
106 }
107 continue;
108 }
109 // Error from the inner stream — propagate it.
110 Poll::Ready(Some(Err(e))) => {
111 return Poll::Ready(Some(Err(e)));
112 }
113 // Inner stream finished: finalize the encoder,
114 // then loop to drain the remaining bytes.
115 Poll::Ready(None) => {
116 *this.done = true;
117 if let Err(e) = this.encoder.flush() {
118 return Poll::Ready(Some(Err(e.into())));
119 }
120 continue;
121 }
122 // No new input and no buffered output: we must wait.
123 Poll::Pending => {
124 return Poll::Pending;
125 }
126 }
127 }
128 }
129}