tower_async_http/
map_response_body.rs

1//! Apply a transformation to the response body.
2//!
3//! # Example
4//!
5//! ```
6//! use bytes::Bytes;
7//! use http::{Request, Response};
8//! use http_body::{Body, Frame};
9//! use http_body_util::Full;
10//! use std::convert::Infallible;
11//! use std::{pin::Pin, task::{Context, Poll}};
12//! use tower_async::{ServiceBuilder, service_fn, ServiceExt, Service, BoxError};
13//! use tower_async_http::map_response_body::MapResponseBodyLayer;
14//! use futures::ready;
15//!
16//! // A wrapper for a `http_body::Body` that prints the size of data chunks
17//! pin_project_lite::pin_project! {
18//!     struct PrintChunkSizesBody<B> {
19//!         #[pin]
20//!         inner: B,
21//!     }
22//! }
23//!
24//! impl<B> PrintChunkSizesBody<B> {
25//!     fn new(inner: B) -> Self {
26//!         Self { inner }
27//!     }
28//! }
29//!
30//! impl<B> Body for PrintChunkSizesBody<B>
31//!     where B: Body<Data = Bytes, Error = BoxError>,
32//! {
33//!     type Data = Bytes;
34//!     type Error = BoxError;
35//!
36//!     fn poll_frame(
37//!         mut self: Pin<&mut Self>,
38//!         cx: &mut Context<'_>,
39//!     ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
40//!         let inner_body = self.as_mut().project().inner;
41//!         if let Some(frame) = ready!(inner_body.poll_frame(cx)?) {
42//!             if let Some(chunk) = frame.data_ref() {
43//!                 println!("chunk size = {}", chunk.len());
44//!             } else {
45//!                 eprintln!("no data chunk found");
46//!             }
47//!             Poll::Ready(Some(Ok(frame)))
48//!         } else {
49//!             Poll::Ready(None)
50//!         }
51//!     }
52//!
53//!     fn is_end_stream(&self) -> bool {
54//!         self.inner.is_end_stream()
55//!     }
56//!
57//!     fn size_hint(&self) -> http_body::SizeHint {
58//!         self.inner.size_hint()
59//!     }
60//! }
61//!
62//! async fn handle(_: Request<Full<Bytes>>) -> Result<Response<Full<Bytes>>, Infallible> {
63//!     // ...
64//!     # Ok(Response::new(Full::default()))
65//! }
66//!
67//! # #[tokio::main]
68//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
69//! let mut svc = ServiceBuilder::new()
70//!     // Wrap response bodies in `PrintChunkSizesBody`
71//!     .layer(MapResponseBodyLayer::new(PrintChunkSizesBody::new))
72//!     .service_fn(handle);
73//!
74//! // Call the service
75//! let request = Request::new(Full::from("foobar"));
76//!
77//! svc.call(request).await?;
78//! # Ok(())
79//! # }
80//! ```
81
82use http::{Request, Response};
83use std::fmt;
84use tower_async_layer::Layer;
85use tower_async_service::Service;
86
87/// Apply a transformation to the response body.
88///
89/// See the [module docs](crate::map_response_body) for an example.
90#[derive(Clone)]
91pub struct MapResponseBodyLayer<F> {
92    f: F,
93}
94
95impl<F> MapResponseBodyLayer<F> {
96    /// Create a new [`MapResponseBodyLayer`].
97    ///
98    /// `F` is expected to be a function that takes a body and returns another body.
99    pub fn new(f: F) -> Self {
100        Self { f }
101    }
102}
103
104impl<S, F> Layer<S> for MapResponseBodyLayer<F>
105where
106    F: Clone,
107{
108    type Service = MapResponseBody<S, F>;
109
110    fn layer(&self, inner: S) -> Self::Service {
111        MapResponseBody::new(inner, self.f.clone())
112    }
113}
114
115impl<F> fmt::Debug for MapResponseBodyLayer<F> {
116    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
117        f.debug_struct("MapResponseBodyLayer")
118            .field("f", &std::any::type_name::<F>())
119            .finish()
120    }
121}
122
123/// Apply a transformation to the response body.
124///
125/// See the [module docs](crate::map_response_body) for an example.
126#[derive(Clone)]
127pub struct MapResponseBody<S, F> {
128    inner: S,
129    f: F,
130}
131
132impl<S, F> MapResponseBody<S, F> {
133    /// Create a new [`MapResponseBody`].
134    ///
135    /// `F` is expected to be a function that takes a body and returns another body.
136    pub fn new(service: S, f: F) -> Self {
137        Self { inner: service, f }
138    }
139
140    /// Returns a new [`Layer`] that wraps services with a `MapResponseBodyLayer` middleware.
141    ///
142    /// [`Layer`]: tower_async_layer::Layer
143    pub fn layer(f: F) -> MapResponseBodyLayer<F> {
144        MapResponseBodyLayer::new(f)
145    }
146
147    define_inner_service_accessors!();
148}
149
150impl<F, S, ReqBody, ResBody, NewResBody> Service<Request<ReqBody>> for MapResponseBody<S, F>
151where
152    S: Service<Request<ReqBody>, Response = Response<ResBody>>,
153    F: Fn(ResBody) -> NewResBody + Clone,
154{
155    type Response = Response<NewResBody>;
156    type Error = S::Error;
157
158    async fn call(&self, req: Request<ReqBody>) -> Result<Self::Response, Self::Error> {
159        let res = self.inner.call(req).await?;
160        Ok(res.map(self.f.clone()))
161    }
162}
163
164impl<S, F> fmt::Debug for MapResponseBody<S, F>
165where
166    S: fmt::Debug,
167{
168    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
169        f.debug_struct("MapResponseBody")
170            .field("inner", &self.inner)
171            .field("f", &std::any::type_name::<F>())
172            .finish()
173    }
174}