tower_async_http/decompression/
mod.rs

1//! Middleware that decompresses request and response bodies.
2//!
3//! # Examples
4//!
5//! #### Request
6//! ```rust
7//! use bytes::{Bytes, BytesMut};
8//! use flate2::{write::GzEncoder, Compression};
9//! use http::{header, HeaderValue, Request, Response};
10//! use http_body_util::{Full, BodyExt};
11//! use std::{error::Error, io::Write};
12//! use tower_async::{Service, ServiceBuilder, service_fn, ServiceExt};
13//! use tower_async_http::{BoxError, decompression::{DecompressionBody, RequestDecompressionLayer}};
14//!
15//! # #[tokio::main]
16//! # async fn main() -> Result<(), BoxError> {
17//! // A request encoded with gzip coming from some HTTP client.
18//! let mut encoder = GzEncoder::new(Vec::new(), Compression::default());
19//! encoder.write_all(b"Hello?")?;
20//! let request = Request::builder()
21//!     .header(header::CONTENT_ENCODING, "gzip")
22//!     .body(Full::from(encoder.finish()?))?;
23//!
24//! // Our HTTP server
25//! let mut server = ServiceBuilder::new()
26//!     // Automatically decompress request bodies.
27//!     .layer(RequestDecompressionLayer::new())
28//!     .service(service_fn(handler));
29//!
30//! // Send the request, with the gzip encoded body, to our server.
31//! let _response = server.call(request).await?;
32//!
33//! // Handler receives request whose body is decoded when read
34//! async fn handler(mut req: Request<DecompressionBody<Full<Bytes>>>) -> Result<Response<Full<Bytes>>, BoxError>{
35//!     let data = req.into_body().collect().await?.to_bytes();
36//!     assert_eq!(&data[..], b"Hello?");
37//!     Ok(Response::new(Full::from("Hello, World!")))
38//! }
39//! # Ok(())
40//! # }
41//! ```
42//!
43//! #### Response
44//! ```rust
45//! use bytes::{Bytes, BytesMut};
46//! use http::{Request, Response};
47//! use http_body_util::{BodyExt, Full};
48//! use std::convert::Infallible;
49//! use tower_async::{Service, ServiceExt, ServiceBuilder, service_fn};
50//! use tower_async_http::{compression::Compression, decompression::DecompressionLayer, BoxError};
51//! #
52//! # #[tokio::main]
53//! # async fn main() -> Result<(), tower_async_http::BoxError> {
54//! # async fn handle(req: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
55//! #     let body = Full::from("Hello, World!");
56//! #     Ok(Response::new(body))
57//! # }
58//!
59//! // Some opaque service that applies compression.
60//! let service = Compression::new(service_fn(handle));
61//!
62//! // Our HTTP client.
63//! let mut client = ServiceBuilder::new()
64//!     // Automatically decompress response bodies.
65//!     .layer(DecompressionLayer::new())
66//!     .service(service);
67//!
68//! // Call the service.
69//! //
70//! // `DecompressionLayer` takes care of setting `Accept-Encoding`.
71//! let request = Request::new(Full::<Bytes>::default());
72//!
73//! let response = client
74//!     .call(request)
75//!     .await?;
76//!
77//! // Read the body
78//! let body = response.into_body();
79//! let bytes = body.collect().await?.to_bytes().to_vec();
80//! let body = String::from_utf8(bytes).map_err(Into::<BoxError>::into)?;
81//!
82//! assert_eq!(body, "Hello, World!");
83//! #
84//! # Ok(())
85//! # }
86//! ```
87
88mod request;
89
90mod body;
91mod layer;
92mod service;
93
94pub use self::{body::DecompressionBody, layer::DecompressionLayer, service::Decompression};
95
96pub use self::request::layer::RequestDecompressionLayer;
97pub use self::request::service::RequestDecompression;
98
99#[cfg(test)]
100mod tests {
101    use super::*;
102
103    use std::convert::Infallible;
104    use std::io::Write;
105
106    use crate::test_helpers::Body;
107    use crate::{compression::Compression, test_helpers::WithTrailers};
108
109    use flate2::write::GzEncoder;
110    use http::{HeaderMap, HeaderName, Request, Response};
111    use http_body_util::BodyExt;
112    use tower_async::{service_fn, Service};
113
114    #[tokio::test]
115    async fn works() {
116        let client = Decompression::new(Compression::new(service_fn(handle)));
117
118        let req = Request::builder()
119            .header("accept-encoding", "gzip")
120            .body(Body::empty())
121            .unwrap();
122        let res = client.call(req).await.unwrap();
123
124        // read the body, it will be decompressed automatically
125        let body = res.into_body();
126        let collected = body.collect().await.unwrap();
127        let trailers = collected.trailers().cloned().unwrap();
128        let decompressed_data = String::from_utf8(collected.to_bytes().to_vec()).unwrap();
129
130        assert_eq!(decompressed_data, "Hello, World!");
131
132        // maintains trailers
133        assert_eq!(trailers["foo"], "bar");
134    }
135
136    async fn handle(_req: Request<Body>) -> Result<Response<WithTrailers<Body>>, Infallible> {
137        let mut trailers = HeaderMap::new();
138        trailers.insert(HeaderName::from_static("foo"), "bar".parse().unwrap());
139        let body = Body::from("Hello, World!").with_trailers(trailers);
140        Ok(Response::builder().body(body).unwrap())
141    }
142
143    #[tokio::test]
144    async fn decompress_multi_gz() {
145        let client = Decompression::new(service_fn(handle_multi_gz));
146
147        let req = Request::builder()
148            .header("accept-encoding", "gzip")
149            .body(Body::empty())
150            .unwrap();
151        let res = client.call(req).await.unwrap();
152
153        // read the body, it will be decompressed automatically
154        let body = res.into_body();
155        let decompressed_data =
156            String::from_utf8(body.collect().await.unwrap().to_bytes().to_vec()).unwrap();
157
158        assert_eq!(decompressed_data, "Hello, World!");
159    }
160
161    async fn handle_multi_gz(_req: Request<Body>) -> Result<Response<Body>, Infallible> {
162        let mut buf = Vec::new();
163        let mut enc1 = GzEncoder::new(&mut buf, Default::default());
164        enc1.write_all(b"Hello, ").unwrap();
165        enc1.finish().unwrap();
166
167        let mut enc2 = GzEncoder::new(&mut buf, Default::default());
168        enc2.write_all(b"World!").unwrap();
169        enc2.finish().unwrap();
170
171        let mut res = Response::new(Body::from(buf));
172        res.headers_mut()
173            .insert("content-encoding", "gzip".parse().unwrap());
174        Ok(res)
175    }
176
177    #[allow(dead_code)]
178    async fn is_compatible_with_hyper() {
179        use hyper_util::{client::legacy::Client, rt::TokioExecutor};
180        use tower_async::ServiceBuilder;
181        use tower_async_bridge::AsyncServiceExt;
182
183        let client = Client::builder(TokioExecutor::new())
184            .build_http()
185            .into_async();
186        let client = ServiceBuilder::new()
187            .layer(DecompressionLayer::new())
188            .service(client);
189
190        let req = Request::new(Body::default());
191
192        let _: Response<DecompressionBody<_>> = client.call(req).await.unwrap();
193    }
194}