tower_async_http/map_response_body.rs
1//! Apply a transformation to the response body.
2//!
3//! # Example
4//!
5//! ```
6//! use bytes::Bytes;
7//! use http::{Request, Response};
8//! use http_body::{Body, Frame};
9//! use http_body_util::Full;
10//! use std::convert::Infallible;
11//! use std::{pin::Pin, task::{Context, Poll}};
12//! use tower_async::{ServiceBuilder, service_fn, ServiceExt, Service, BoxError};
13//! use tower_async_http::map_response_body::MapResponseBodyLayer;
14//! use futures::ready;
15//!
16//! // A wrapper for a `http_body::Body` that prints the size of data chunks
17//! pin_project_lite::pin_project! {
18//! struct PrintChunkSizesBody<B> {
19//! #[pin]
20//! inner: B,
21//! }
22//! }
23//!
24//! impl<B> PrintChunkSizesBody<B> {
25//! fn new(inner: B) -> Self {
26//! Self { inner }
27//! }
28//! }
29//!
30//! impl<B> Body for PrintChunkSizesBody<B>
31//! where B: Body<Data = Bytes, Error = BoxError>,
32//! {
33//! type Data = Bytes;
34//! type Error = BoxError;
35//!
36//! fn poll_frame(
37//! mut self: Pin<&mut Self>,
38//! cx: &mut Context<'_>,
39//! ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
40//! let inner_body = self.as_mut().project().inner;
41//! if let Some(frame) = ready!(inner_body.poll_frame(cx)?) {
42//! if let Some(chunk) = frame.data_ref() {
43//! println!("chunk size = {}", chunk.len());
44//! } else {
45//! eprintln!("no data chunk found");
46//! }
47//! Poll::Ready(Some(Ok(frame)))
48//! } else {
49//! Poll::Ready(None)
50//! }
51//! }
52//!
53//! fn is_end_stream(&self) -> bool {
54//! self.inner.is_end_stream()
55//! }
56//!
57//! fn size_hint(&self) -> http_body::SizeHint {
58//! self.inner.size_hint()
59//! }
60//! }
61//!
62//! async fn handle(_: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
63//! // ...
64//! # Ok(Response::new(Full::default()))
65//! }
66//!
67//! # #[tokio::main]
68//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
69//! let mut svc = ServiceBuilder::new()
70//! // Wrap response bodies in `PrintChunkSizesBody`
71//! .layer(MapResponseBodyLayer::new(PrintChunkSizesBody::new))
72//! .service_fn(handle);
73//!
74//! // Call the service
75//! let request = Request::new(Full::from("foobar"));
76//!
77//! svc.call(request).await?;
78//! # Ok(())
79//! # }
80//! ```
81
82use http::{Request, Response};
83use std::fmt;
84use tower_async_layer::Layer;
85use tower_async_service::Service;
86
87/// Apply a transformation to the response body.
88///
89/// See the [module docs](crate::map_response_body) for an example.
90#[derive(Clone)]
91pub struct MapResponseBodyLayer<F> {
92 f: F,
93}
94
95impl<F> MapResponseBodyLayer<F> {
96 /// Create a new [`MapResponseBodyLayer`].
97 ///
98 /// `F` is expected to be a function that takes a body and returns another body.
99 pub fn new(f: F) -> Self {
100 Self { f }
101 }
102}
103
104impl<S, F> Layer<S> for MapResponseBodyLayer<F>
105where
106 F: Clone,
107{
108 type Service = MapResponseBody<S, F>;
109
110 fn layer(&self, inner: S) -> Self::Service {
111 MapResponseBody::new(inner, self.f.clone())
112 }
113}
114
115impl<F> fmt::Debug for MapResponseBodyLayer<F> {
116 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117 f.debug_struct("MapResponseBodyLayer")
118 .field("f", &std::any::type_name::<F>())
119 .finish()
120 }
121}
122
123/// Apply a transformation to the response body.
124///
125/// See the [module docs](crate::map_response_body) for an example.
126#[derive(Clone)]
127pub struct MapResponseBody<S, F> {
128 inner: S,
129 f: F,
130}
131
132impl<S, F> MapResponseBody<S, F> {
133 /// Create a new [`MapResponseBody`].
134 ///
135 /// `F` is expected to be a function that takes a body and returns another body.
136 pub fn new(service: S, f: F) -> Self {
137 Self { inner: service, f }
138 }
139
140 /// Returns a new [`Layer`] that wraps services with a `MapResponseBodyLayer` middleware.
141 ///
142 /// [`Layer`]: tower_async_layer::Layer
143 pub fn layer(f: F) -> MapResponseBodyLayer<F> {
144 MapResponseBodyLayer::new(f)
145 }
146
147 define_inner_service_accessors!();
148}
149
150impl<F, S, ReqBody, ResBody, NewResBody> Service<Request<ReqBody>> for MapResponseBody<S, F>
151where
152 S: Service<Request<ReqBody>, Response = Response<ResBody>>,
153 F: Fn(ResBody) -> NewResBody + Clone,
154{
155 type Response = Response<NewResBody>;
156 type Error = S::Error;
157
158 async fn call(&self, req: Request<ReqBody>) -> Result<Self::Response, Self::Error> {
159 let res = self.inner.call(req).await?;
160 Ok(res.map(self.f.clone()))
161 }
162}
163
164impl<S, F> fmt::Debug for MapResponseBody<S, F>
165where
166 S: fmt::Debug,
167{
168 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169 f.debug_struct("MapResponseBody")
170 .field("inner", &self.inner)
171 .field("f", &std::any::type_name::<F>())
172 .finish()
173 }
174}