rama_http/layer/compression/
body.rs1#![allow(unused_imports)]
2
3use crate::HeaderMap;
4use crate::dep::http_body::{Body, Frame};
5use crate::layer::util::compression::{
6 AsyncReadBody, BodyIntoStream, CompressionLevel, DecorateAsyncRead, WrapBody,
7};
8use rama_core::error::BoxError;
9
10use async_compression::tokio::bufread::{BrotliEncoder, GzipEncoder, ZlibEncoder, ZstdEncoder};
11
12use bytes::{Buf, Bytes};
13use futures_lite::ready;
14use pin_project_lite::pin_project;
15use std::{
16 io,
17 marker::PhantomData,
18 pin::Pin,
19 task::{Context, Poll},
20};
21use tokio_util::io::StreamReader;
22
23use super::pin_project_cfg::pin_project_cfg;
24
25pin_project! {
26 pub struct CompressionBody<B>
30 where
31 B: Body,
32 {
33 #[pin]
34 pub(crate) inner: BodyInner<B>,
35 }
36}
37
38impl<B> Default for CompressionBody<B>
39where
40 B: Body + Default,
41{
42 fn default() -> Self {
43 Self {
44 inner: BodyInner::Identity {
45 inner: B::default(),
46 },
47 }
48 }
49}
50
51impl<B> CompressionBody<B>
52where
53 B: Body,
54{
55 pub(crate) fn new(inner: BodyInner<B>) -> Self {
56 Self { inner }
57 }
58}
59
60type GzipBody<B> = WrapBody<GzipEncoder<B>>;
61
62type DeflateBody<B> = WrapBody<ZlibEncoder<B>>;
63
64type BrotliBody<B> = WrapBody<BrotliEncoder<B>>;
65
66type ZstdBody<B> = WrapBody<ZstdEncoder<B>>;
67
68pin_project_cfg! {
69 #[project = BodyInnerProj]
70 pub(crate) enum BodyInner<B>
71 where
72 B: Body,
73 {
74 Gzip {
75 #[pin]
76 inner: GzipBody<B>,
77 },
78 Deflate {
79 #[pin]
80 inner: DeflateBody<B>,
81 },
82 Brotli {
83 #[pin]
84 inner: BrotliBody<B>,
85 },
86 Zstd {
87 #[pin]
88 inner: ZstdBody<B>,
89 },
90 Identity {
91 #[pin]
92 inner: B,
93 },
94 }
95}
96
97impl<B: Body> BodyInner<B> {
98 pub(crate) fn gzip(inner: WrapBody<GzipEncoder<B>>) -> Self {
99 Self::Gzip { inner }
100 }
101
102 pub(crate) fn deflate(inner: WrapBody<ZlibEncoder<B>>) -> Self {
103 Self::Deflate { inner }
104 }
105
106 pub(crate) fn brotli(inner: WrapBody<BrotliEncoder<B>>) -> Self {
107 Self::Brotli { inner }
108 }
109
110 pub(crate) fn zstd(inner: WrapBody<ZstdEncoder<B>>) -> Self {
111 Self::Zstd { inner }
112 }
113
114 pub(crate) fn identity(inner: B) -> Self {
115 Self::Identity { inner }
116 }
117}
118
119impl<B> Body for CompressionBody<B>
120where
121 B: Body<Error: Into<BoxError>>,
122{
123 type Data = Bytes;
124 type Error = BoxError;
125
126 fn poll_frame(
127 self: Pin<&mut Self>,
128 cx: &mut Context<'_>,
129 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
130 match self.project().inner.project() {
131 BodyInnerProj::Gzip { inner } => inner.poll_frame(cx),
132 BodyInnerProj::Deflate { inner } => inner.poll_frame(cx),
133 BodyInnerProj::Brotli { inner } => inner.poll_frame(cx),
134 BodyInnerProj::Zstd { inner } => inner.poll_frame(cx),
135 BodyInnerProj::Identity { inner } => match ready!(inner.poll_frame(cx)) {
136 Some(Ok(frame)) => {
137 let frame = frame.map_data(|mut buf| buf.copy_to_bytes(buf.remaining()));
138 Poll::Ready(Some(Ok(frame)))
139 }
140 Some(Err(err)) => Poll::Ready(Some(Err(err.into()))),
141 None => Poll::Ready(None),
142 },
143 }
144 }
145
146 fn size_hint(&self) -> rama_http_types::dep::http_body::SizeHint {
147 if let BodyInner::Identity { inner } = &self.inner {
148 inner.size_hint()
149 } else {
150 rama_http_types::dep::http_body::SizeHint::new()
151 }
152 }
153
154 fn is_end_stream(&self) -> bool {
155 if let BodyInner::Identity { inner } = &self.inner {
156 inner.is_end_stream()
157 } else {
158 false
159 }
160 }
161}
162
163impl<B> DecorateAsyncRead for GzipEncoder<B>
164where
165 B: Body,
166{
167 type Input = AsyncReadBody<B>;
168 type Output = GzipEncoder<Self::Input>;
169
170 fn apply(input: Self::Input, quality: CompressionLevel) -> Self::Output {
171 GzipEncoder::with_quality(input, quality.into_async_compression())
172 }
173
174 fn get_pin_mut(pinned: Pin<&mut Self::Output>) -> Pin<&mut Self::Input> {
175 pinned.get_pin_mut()
176 }
177}
178
179impl<B> DecorateAsyncRead for ZlibEncoder<B>
180where
181 B: Body,
182{
183 type Input = AsyncReadBody<B>;
184 type Output = ZlibEncoder<Self::Input>;
185
186 fn apply(input: Self::Input, quality: CompressionLevel) -> Self::Output {
187 ZlibEncoder::with_quality(input, quality.into_async_compression())
188 }
189
190 fn get_pin_mut(pinned: Pin<&mut Self::Output>) -> Pin<&mut Self::Input> {
191 pinned.get_pin_mut()
192 }
193}
194
195impl<B> DecorateAsyncRead for BrotliEncoder<B>
196where
197 B: Body,
198{
199 type Input = AsyncReadBody<B>;
200 type Output = BrotliEncoder<Self::Input>;
201
202 fn apply(input: Self::Input, quality: CompressionLevel) -> Self::Output {
203 let level = match quality {
209 CompressionLevel::Default => async_compression::Level::Precise(4),
210 other => other.into_async_compression(),
211 };
212 BrotliEncoder::with_quality(input, level)
213 }
214
215 fn get_pin_mut(pinned: Pin<&mut Self::Output>) -> Pin<&mut Self::Input> {
216 pinned.get_pin_mut()
217 }
218}
219
220impl<B> DecorateAsyncRead for ZstdEncoder<B>
221where
222 B: Body,
223{
224 type Input = AsyncReadBody<B>;
225 type Output = ZstdEncoder<Self::Input>;
226
227 fn apply(input: Self::Input, quality: CompressionLevel) -> Self::Output {
228 let needs_window_limit = match quality {
239 CompressionLevel::Best => true, CompressionLevel::Precise(level) => level >= 17,
241 _ => false,
242 };
243 if needs_window_limit {
246 let params = [async_compression::zstd::CParameter::window_log(23)];
247 ZstdEncoder::with_quality_and_params(input, quality.into_async_compression(), ¶ms)
248 } else {
249 ZstdEncoder::with_quality(input, quality.into_async_compression())
250 }
251 }
252
253 fn get_pin_mut(pinned: Pin<&mut Self::Output>) -> Pin<&mut Self::Input> {
254 pinned.get_pin_mut()
255 }
256}