rama_http/layer/compression/
body.rs

1#![allow(unused_imports)]
2
3use crate::HeaderMap;
4use crate::dep::http_body::{Body, Frame};
5use crate::layer::util::compression::{
6    AsyncReadBody, BodyIntoStream, CompressionLevel, DecorateAsyncRead, WrapBody,
7};
8use rama_core::error::BoxError;
9
10use async_compression::tokio::bufread::{BrotliEncoder, GzipEncoder, ZlibEncoder, ZstdEncoder};
11
12use bytes::{Buf, Bytes};
13use futures_lite::ready;
14use pin_project_lite::pin_project;
15use std::{
16    io,
17    marker::PhantomData,
18    pin::Pin,
19    task::{Context, Poll},
20};
21use tokio_util::io::StreamReader;
22
23use super::pin_project_cfg::pin_project_cfg;
24
25pin_project! {
26    /// Response body of [`Compression`].
27    ///
28    /// [`Compression`]: super::Compression
29    pub struct CompressionBody<B>
30    where
31        B: Body,
32    {
33        #[pin]
34        pub(crate) inner: BodyInner<B>,
35    }
36}
37
38impl<B> Default for CompressionBody<B>
39where
40    B: Body + Default,
41{
42    fn default() -> Self {
43        Self {
44            inner: BodyInner::Identity {
45                inner: B::default(),
46            },
47        }
48    }
49}
50
51impl<B> CompressionBody<B>
52where
53    B: Body,
54{
55    pub(crate) fn new(inner: BodyInner<B>) -> Self {
56        Self { inner }
57    }
58}
59
60type GzipBody<B> = WrapBody<GzipEncoder<B>>;
61
62type DeflateBody<B> = WrapBody<ZlibEncoder<B>>;
63
64type BrotliBody<B> = WrapBody<BrotliEncoder<B>>;
65
66type ZstdBody<B> = WrapBody<ZstdEncoder<B>>;
67
68pin_project_cfg! {
69    #[project = BodyInnerProj]
70    pub(crate) enum BodyInner<B>
71    where
72        B: Body,
73    {
74        Gzip {
75            #[pin]
76            inner: GzipBody<B>,
77        },
78        Deflate {
79            #[pin]
80            inner: DeflateBody<B>,
81        },
82        Brotli {
83            #[pin]
84            inner: BrotliBody<B>,
85        },
86        Zstd {
87            #[pin]
88            inner: ZstdBody<B>,
89        },
90        Identity {
91            #[pin]
92            inner: B,
93        },
94    }
95}
96
97impl<B: Body> BodyInner<B> {
98    pub(crate) fn gzip(inner: WrapBody<GzipEncoder<B>>) -> Self {
99        Self::Gzip { inner }
100    }
101
102    pub(crate) fn deflate(inner: WrapBody<ZlibEncoder<B>>) -> Self {
103        Self::Deflate { inner }
104    }
105
106    pub(crate) fn brotli(inner: WrapBody<BrotliEncoder<B>>) -> Self {
107        Self::Brotli { inner }
108    }
109
110    pub(crate) fn zstd(inner: WrapBody<ZstdEncoder<B>>) -> Self {
111        Self::Zstd { inner }
112    }
113
114    pub(crate) fn identity(inner: B) -> Self {
115        Self::Identity { inner }
116    }
117}
118
119impl<B> Body for CompressionBody<B>
120where
121    B: Body<Error: Into<BoxError>>,
122{
123    type Data = Bytes;
124    type Error = BoxError;
125
126    fn poll_frame(
127        self: Pin<&mut Self>,
128        cx: &mut Context<'_>,
129    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
130        match self.project().inner.project() {
131            BodyInnerProj::Gzip { inner } => inner.poll_frame(cx),
132            BodyInnerProj::Deflate { inner } => inner.poll_frame(cx),
133            BodyInnerProj::Brotli { inner } => inner.poll_frame(cx),
134            BodyInnerProj::Zstd { inner } => inner.poll_frame(cx),
135            BodyInnerProj::Identity { inner } => match ready!(inner.poll_frame(cx)) {
136                Some(Ok(frame)) => {
137                    let frame = frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()));
138                    Poll::Ready(Some(Ok(frame)))
139                }
140                Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
141                None => Poll::Ready(None),
142            },
143        }
144    }
145
146    fn size_hint(&self) -> rama_http_types::dep::http_body::SizeHint {
147        if let BodyInner::Identity { inner } = &self.inner {
148            inner.size_hint()
149        } else {
150            rama_http_types::dep::http_body::SizeHint::new()
151        }
152    }
153
154    fn is_end_stream(&self) -> bool {
155        if let BodyInner::Identity { inner } = &self.inner {
156            inner.is_end_stream()
157        } else {
158            false
159        }
160    }
161}
162
163impl<B> DecorateAsyncRead for GzipEncoder<B>
164where
165    B: Body,
166{
167    type Input = AsyncReadBody<B>;
168    type Output = GzipEncoder<Self::Input>;
169
170    fn apply(input: Self::Input, quality: CompressionLevel) -> Self::Output {
171        GzipEncoder::with_quality(input, quality.into_async_compression())
172    }
173
174    fn get_pin_mut(pinned: Pin<&mut Self::Output>) -> Pin<&mut Self::Input> {
175        pinned.get_pin_mut()
176    }
177}
178
179impl<B> DecorateAsyncRead for ZlibEncoder<B>
180where
181    B: Body,
182{
183    type Input = AsyncReadBody<B>;
184    type Output = ZlibEncoder<Self::Input>;
185
186    fn apply(input: Self::Input, quality: CompressionLevel) -> Self::Output {
187        ZlibEncoder::with_quality(input, quality.into_async_compression())
188    }
189
190    fn get_pin_mut(pinned: Pin<&mut Self::Output>) -> Pin<&mut Self::Input> {
191        pinned.get_pin_mut()
192    }
193}
194
195impl<B> DecorateAsyncRead for BrotliEncoder<B>
196where
197    B: Body,
198{
199    type Input = AsyncReadBody<B>;
200    type Output = BrotliEncoder<Self::Input>;
201
202    fn apply(input: Self::Input, quality: CompressionLevel) -> Self::Output {
203        // The brotli crate used under the hood here has a default compression level of 11,
204        // which is the max for brotli. This causes extremely slow compression times, so we
205        // manually set a default of 4 here.
206        //
207        // This is the same default used by NGINX for on-the-fly brotli compression.
208        let level = match quality {
209            CompressionLevel::Default => async_compression::Level::Precise(4),
210            other => other.into_async_compression(),
211        };
212        BrotliEncoder::with_quality(input, level)
213    }
214
215    fn get_pin_mut(pinned: Pin<&mut Self::Output>) -> Pin<&mut Self::Input> {
216        pinned.get_pin_mut()
217    }
218}
219
220impl<B> DecorateAsyncRead for ZstdEncoder<B>
221where
222    B: Body,
223{
224    type Input = AsyncReadBody<B>;
225    type Output = ZstdEncoder<Self::Input>;
226
227    fn apply(input: Self::Input, quality: CompressionLevel) -> Self::Output {
228        // See https://issues.chromium.org/issues/41493659:
229        //  "For memory usage reasons, Chromium limits the window size to 8MB"
230        // See https://datatracker.ietf.org/doc/html/rfc8878#name-window-descriptor
231        //  "For improved interoperability, it's recommended for decoders to support values
232        //  of Window_Size up to 8 MB and for encoders not to generate frames requiring a
233        //  Window_Size larger than 8 MB."
234        // Level 17 in zstd (as of v1.5.6) is the first level with a window size of 8 MB (2^23):
235        // https://github.com/facebook/zstd/blob/v1.5.6/lib/compress/clevels.h#L25-L51
236        // Set the parameter for all levels >= 17. This will either have no effect (but reduce
237        // the risk of future changes in zstd) or limit the window log to 8MB.
238        let needs_window_limit = match quality {
239            CompressionLevel::Best => true, // level 20
240            CompressionLevel::Precise(level) => level >= 17,
241            _ => false,
242        };
243        // The parameter is not set for levels below 17 as it will increase the window size
244        // for those levels.
245        if needs_window_limit {
246            let params = [async_compression::zstd::CParameter::window_log(23)];
247            ZstdEncoder::with_quality_and_params(input, quality.into_async_compression(), &params)
248        } else {
249            ZstdEncoder::with_quality(input, quality.into_async_compression())
250        }
251    }
252
253    fn get_pin_mut(pinned: Pin<&mut Self::Output>) -> Pin<&mut Self::Input> {
254        pinned.get_pin_mut()
255    }
256}