1#[cfg(feature = "compression-brotli")]
6use async_compression::tokio::bufread::BrotliEncoder;
7
8#[cfg(feature = "compression-gzip")]
9use async_compression::tokio::bufread::{DeflateEncoder, GzipEncoder};
10
11use crate::bodyt::Body;
12use http::header::{HeaderValue, CONTENT_ENCODING, CONTENT_LENGTH};
13use tokio_util::io::{ReaderStream, StreamReader};
14
15use crate::filter::{Filter, WrapSealed};
16use crate::reject::IsReject;
17use crate::reply::{Reply, Response};
18
19use self::internal::{CompressionProps, WithCompression};
20
21enum CompressionAlgo {
22 #[cfg(feature = "compression-brotli")]
23 BR,
24 #[cfg(feature = "compression-gzip")]
25 DEFLATE,
26 #[cfg(feature = "compression-gzip")]
27 GZIP,
28}
29
30impl From<CompressionAlgo> for HeaderValue {
31 #[inline]
32 fn from(algo: CompressionAlgo) -> Self {
33 HeaderValue::from_static(match algo {
34 #[cfg(feature = "compression-brotli")]
35 CompressionAlgo::BR => "br",
36 #[cfg(feature = "compression-gzip")]
37 CompressionAlgo::DEFLATE => "deflate",
38 #[cfg(feature = "compression-gzip")]
39 CompressionAlgo::GZIP => "gzip",
40 })
41 }
42}
43
44#[derive(Clone, Copy, Debug)]
46pub struct Compression<F> {
47 func: F,
48}
49
50#[cfg(feature = "compression-gzip")]
67pub fn gzip() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
68 let func = move |mut props: CompressionProps| {
69 let body = Body::wrap_stream(ReaderStream::new(GzipEncoder::new(StreamReader::new(
70 props.body,
71 ))));
72 props
73 .head
74 .headers
75 .append(CONTENT_ENCODING, CompressionAlgo::GZIP.into());
76 props.head.headers.remove(CONTENT_LENGTH);
77 Response::from_parts(props.head, body)
78 };
79 Compression { func }
80}
81
82#[cfg(feature = "compression-gzip")]
96pub fn deflate() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
97 let func = move |mut props: CompressionProps| {
98 let body = Body::wrap_stream(ReaderStream::new(DeflateEncoder::new(StreamReader::new(
99 props.body,
100 ))));
101 props
102 .head
103 .headers
104 .append(CONTENT_ENCODING, CompressionAlgo::DEFLATE.into());
105 props.head.headers.remove(CONTENT_LENGTH);
106 Response::from_parts(props.head, body)
107 };
108 Compression { func }
109}
110
111#[cfg(feature = "compression-brotli")]
125pub fn brotli() -> Compression<impl Fn(CompressionProps) -> Response + Copy> {
126 let func = move |mut props: CompressionProps| {
127 let body = Body::wrap_stream(ReaderStream::new(BrotliEncoder::new(StreamReader::new(
128 props.body,
129 ))));
130 props
131 .head
132 .headers
133 .append(CONTENT_ENCODING, CompressionAlgo::BR.into());
134 props.head.headers.remove(CONTENT_LENGTH);
135 Response::from_parts(props.head, body)
136 };
137 Compression { func }
138}
139
140impl<FN, F> WrapSealed<F> for Compression<FN>
141where
142 FN: Fn(CompressionProps) -> Response + Clone + Send,
143 F: Filter + Clone + Send,
144 F::Extract: Reply,
145 F::Error: IsReject,
146{
147 type Wrapped = WithCompression<FN, F>;
148
149 fn wrap(&self, filter: F) -> Self::Wrapped {
150 WithCompression {
151 filter,
152 compress: self.clone(),
153 }
154 }
155}
156
157mod internal {
158 use std::future::Future;
159 use std::pin::Pin;
160 use std::task::{Context, Poll};
161
162 use bytes::Bytes;
163 use futures_util::{ready, Stream, TryFuture};
164 use http_body_util::BodyDataStream;
165 use pin_project::pin_project;
166
167 use crate::bodyt::Body;
168 use crate::filter::{Filter, FilterBase, Internal};
169 use crate::reject::IsReject;
170 use crate::reply::{Reply, Response};
171
172 use super::Compression;
173
174 #[pin_project]
177 #[derive(Debug)]
178 pub struct CompressableBody<S, E>
179 where
180 E: std::error::Error,
181 S: Stream<Item = Result<Bytes, E>>,
182 {
183 #[pin]
184 body: S,
185 }
186
187 impl<S, E> Stream for CompressableBody<S, E>
188 where
189 E: std::error::Error,
190 S: Stream<Item = Result<Bytes, E>>,
191 {
192 type Item = std::io::Result<Bytes>;
193
194 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
195 use std::io::{Error, ErrorKind};
196
197 let pin = self.project();
198 S::poll_next(pin.body, cx).map_err(|_| Error::from(ErrorKind::InvalidData))
199 }
200 }
201
202 impl From<Body> for CompressableBody<BodyDataStream<Body>, crate::Error> {
203 fn from(body: Body) -> Self {
204 CompressableBody {
205 body: BodyDataStream::new(body),
206 }
207 }
208 }
209
210 #[derive(Debug)]
212 pub struct CompressionProps {
213 pub(super) body: CompressableBody<BodyDataStream<Body>, crate::Error>,
214 pub(super) head: http::response::Parts,
215 }
216
217 impl From<http::Response<Body>> for CompressionProps {
218 fn from(resp: http::Response<Body>) -> Self {
219 let (head, body) = resp.into_parts();
220 CompressionProps {
221 body: body.into(),
222 head,
223 }
224 }
225 }
226
227 #[allow(missing_debug_implementations)]
228 pub struct Compressed(pub(super) Response);
229
230 impl Reply for Compressed {
231 #[inline]
232 fn into_response(self) -> Response {
233 self.0
234 }
235 }
236
237 #[allow(missing_debug_implementations)]
238 #[derive(Clone, Copy)]
239 pub struct WithCompression<FN, F> {
240 pub(super) compress: Compression<FN>,
241 pub(super) filter: F,
242 }
243
244 impl<FN, F> FilterBase for WithCompression<FN, F>
245 where
246 FN: Fn(CompressionProps) -> Response + Clone + Send,
247 F: Filter + Clone + Send,
248 F::Extract: Reply,
249 F::Error: IsReject,
250 {
251 type Extract = (Compressed,);
252 type Error = F::Error;
253 type Future = WithCompressionFuture<FN, F::Future>;
254
255 fn filter(&self, _: Internal) -> Self::Future {
256 WithCompressionFuture {
257 compress: self.compress.clone(),
258 future: self.filter.filter(Internal),
259 }
260 }
261 }
262
263 #[allow(missing_debug_implementations)]
264 #[pin_project]
265 pub struct WithCompressionFuture<FN, F> {
266 compress: Compression<FN>,
267 #[pin]
268 future: F,
269 }
270
271 impl<FN, F> Future for WithCompressionFuture<FN, F>
272 where
273 FN: Fn(CompressionProps) -> Response,
274 F: TryFuture,
275 F::Ok: Reply,
276 F::Error: IsReject,
277 {
278 type Output = Result<(Compressed,), F::Error>;
279
280 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
281 let pin = self.as_mut().project();
282 let result = ready!(pin.future.try_poll(cx));
283 match result {
284 Ok(reply) => {
285 let resp = (self.compress.func)(reply.into_response().into());
286 Poll::Ready(Ok((Compressed(resp),)))
287 }
288 Err(reject) => Poll::Ready(Err(reject)),
289 }
290 }
291 }
292}