Skip to main content

warp/filters/
compression.rs

1//! Compression Filters
2//!
3//! Filters that compress the body of a response.
4
5#[cfg(feature = "compression-brotli")]
6use async_compression::tokio::bufread::BrotliEncoder;
7
8#[cfg(feature = "compression-gzip")]
9use async_compression::tokio::bufread::{DeflateEncoder, GzipEncoder};
10
11use crate::bodyt::Body;
12use http::header::{HeaderValue, CONTENT_ENCODING, CONTENT_LENGTH};
13use tokio_util::io::{ReaderStream, StreamReader};
14
15use crate::filter::{Filter, WrapSealed};
16use crate::reject::IsReject;
17use crate::reply::{Reply, Response};
18
19use self::internal::{CompressionProps, WithCompression};
20
21enum CompressionAlgo {
22    #[cfg(feature = "compression-brotli")]
23    BR,
24    #[cfg(feature = "compression-gzip")]
25    DEFLATE,
26    #[cfg(feature = "compression-gzip")]
27    GZIP,
28}
29
30impl From<CompressionAlgo> for HeaderValue {
31    #[inline]
32    fn from(algo: CompressionAlgo) -> Self {
33        HeaderValue::from_static(match algo {
34            #[cfg(feature = "compression-brotli")]
35            CompressionAlgo::BR => "br",
36            #[cfg(feature = "compression-gzip")]
37            CompressionAlgo::DEFLATE => "deflate",
38            #[cfg(feature = "compression-gzip")]
39            CompressionAlgo::GZIP => "gzip",
40        })
41    }
42}
43
44/// Compression
45#[derive(Clone, Copy, Debug)]
46pub struct Compression<F> {
47    func: F,
48}
49
50// TODO: The implementation of `gzip()`, `deflate()`, and `brotli()` could be replaced with
51// generics or a macro
52
53/// Create a wrapping filter that compresses the Body of a [`Response`](crate::reply::Response)
54/// using gzip, adding `content-encoding: gzip` to the Response's [`HeaderMap`](hyper::HeaderMap)
55///
56/// # Example
57///
58/// ```
59/// use warp::Filter;
60///
61/// let route = warp::get()
62///     .and(warp::path::end())
63///     .and(warp::fs::file("./README.md"))
64///     .with(warp::compression::gzip());
65/// ```
66#[cfg(feature = "compression-gzip")]
67pub fn gzip() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
68    let func = move |mut props: CompressionProps| {
69        let body = Body::wrap_stream(ReaderStream::new(GzipEncoder::new(StreamReader::new(
70            props.body,
71        ))));
72        props
73            .head
74            .headers
75            .append(CONTENT_ENCODING, CompressionAlgo::GZIP.into());
76        props.head.headers.remove(CONTENT_LENGTH);
77        Response::from_parts(props.head, body)
78    };
79    Compression { func }
80}
81
82/// Create a wrapping filter that compresses the Body of a [`Response`](crate::reply::Response)
83/// using deflate, adding `content-encoding: deflate` to the Response's [`HeaderMap`](hyper::HeaderMap)
84///
85/// # Example
86///
87/// ```
88/// use warp::Filter;
89///
90/// let route = warp::get()
91///     .and(warp::path::end())
92///     .and(warp::fs::file("./README.md"))
93///     .with(warp::compression::deflate());
94/// ```
95#[cfg(feature = "compression-gzip")]
96pub fn deflate() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
97    let func = move |mut props: CompressionProps| {
98        let body = Body::wrap_stream(ReaderStream::new(DeflateEncoder::new(StreamReader::new(
99            props.body,
100        ))));
101        props
102            .head
103            .headers
104            .append(CONTENT_ENCODING, CompressionAlgo::DEFLATE.into());
105        props.head.headers.remove(CONTENT_LENGTH);
106        Response::from_parts(props.head, body)
107    };
108    Compression { func }
109}
110
111/// Create a wrapping filter that compresses the Body of a [`Response`](crate::reply::Response)
112/// using brotli, adding `content-encoding: br` to the Response's [`HeaderMap`](hyper::HeaderMap)
113///
114/// # Example
115///
116/// ```
117/// use warp::Filter;
118///
119/// let route = warp::get()
120///     .and(warp::path::end())
121///     .and(warp::fs::file("./README.md"))
122///     .with(warp::compression::brotli());
123/// ```
124#[cfg(feature = "compression-brotli")]
125pub fn brotli() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
126    let func = move |mut props: CompressionProps| {
127        let body = Body::wrap_stream(ReaderStream::new(BrotliEncoder::new(StreamReader::new(
128            props.body,
129        ))));
130        props
131            .head
132            .headers
133            .append(CONTENT_ENCODING, CompressionAlgo::BR.into());
134        props.head.headers.remove(CONTENT_LENGTH);
135        Response::from_parts(props.head, body)
136    };
137    Compression { func }
138}
139
140impl<FN, F> WrapSealed<F> for Compression<FN>
141where
142    FN: Fn(CompressionProps) -> Response + Clone + Send,
143    F: Filter + Clone + Send,
144    F::Extract: Reply,
145    F::Error: IsReject,
146{
147    type Wrapped = WithCompression<FN, F>;
148
149    fn wrap(&self, filter: F) -> Self::Wrapped {
150        WithCompression {
151            filter,
152            compress: self.clone(),
153        }
154    }
155}
156
157mod internal {
158    use std::future::Future;
159    use std::pin::Pin;
160    use std::task::{Context, Poll};
161
162    use bytes::Bytes;
163    use futures_util::{ready, Stream, TryFuture};
164    use http_body_util::BodyDataStream;
165    use pin_project::pin_project;
166
167    use crate::bodyt::Body;
168    use crate::filter::{Filter, FilterBase, Internal};
169    use crate::reject::IsReject;
170    use crate::reply::{Reply, Response};
171
172    use super::Compression;
173
174    /// A wrapper around any type that implements [`Stream`](futures::Stream) to be
175    /// compatible with async_compression's Stream based encoders
176    #[pin_project]
177    #[derive(Debug)]
178    pub struct CompressableBody<S, E>
179    where
180        E: std::error::Error,
181        S: Stream<Item = Result<Bytes, E>>,
182    {
183        #[pin]
184        body: S,
185    }
186
187    impl<S, E> Stream for CompressableBody<S, E>
188    where
189        E: std::error::Error,
190        S: Stream<Item = Result<Bytes, E>>,
191    {
192        type Item = std::io::Result<Bytes>;
193
194        fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195            use std::io::{Error, ErrorKind};
196
197            let pin = self.project();
198            S::poll_next(pin.body, cx).map_err(|_| Error::from(ErrorKind::InvalidData))
199        }
200    }
201
202    impl From<Body> for CompressableBody<BodyDataStream<Body>, crate::Error> {
203        fn from(body: Body) -> Self {
204            CompressableBody {
205                body: BodyDataStream::new(body),
206            }
207        }
208    }
209
210    /// Compression Props
211    #[derive(Debug)]
212    pub struct CompressionProps {
213        pub(super) body: CompressableBody<BodyDataStream<Body>, crate::Error>,
214        pub(super) head: http::response::Parts,
215    }
216
217    impl From<http::Response<Body>> for CompressionProps {
218        fn from(resp: http::Response<Body>) -> Self {
219            let (head, body) = resp.into_parts();
220            CompressionProps {
221                body: body.into(),
222                head,
223            }
224        }
225    }
226
227    #[allow(missing_debug_implementations)]
228    pub struct Compressed(pub(super) Response);
229
230    impl Reply for Compressed {
231        #[inline]
232        fn into_response(self) -> Response {
233            self.0
234        }
235    }
236
237    #[allow(missing_debug_implementations)]
238    #[derive(Clone, Copy)]
239    pub struct WithCompression<FN, F> {
240        pub(super) compress: Compression<FN>,
241        pub(super) filter: F,
242    }
243
244    impl<FN, F> FilterBase for WithCompression<FN, F>
245    where
246        FN: Fn(CompressionProps) -> Response + Clone + Send,
247        F: Filter + Clone + Send,
248        F::Extract: Reply,
249        F::Error: IsReject,
250    {
251        type Extract = (Compressed,);
252        type Error = F::Error;
253        type Future = WithCompressionFuture<FN, F::Future>;
254
255        fn filter(&self, _: Internal) -> Self::Future {
256            WithCompressionFuture {
257                compress: self.compress.clone(),
258                future: self.filter.filter(Internal),
259            }
260        }
261    }
262
263    #[allow(missing_debug_implementations)]
264    #[pin_project]
265    pub struct WithCompressionFuture<FN, F> {
266        compress: Compression<FN>,
267        #[pin]
268        future: F,
269    }
270
271    impl<FN, F> Future for WithCompressionFuture<FN, F>
272    where
273        FN: Fn(CompressionProps) -> Response,
274        F: TryFuture,
275        F::Ok: Reply,
276        F::Error: IsReject,
277    {
278        type Output = Result<(Compressed,), F::Error>;
279
280        fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
281            let pin = self.as_mut().project();
282            let result = ready!(pin.future.try_poll(cx));
283            match result {
284                Ok(reply) => {
285                    let resp = (self.compress.func)(reply.into_response().into());
286                    Poll::Ready(Ok((Compressed(resp),)))
287                }
288                Err(reject) => Poll::Ready(Err(reject)),
289            }
290        }
291    }
292}