rama_http/layer/
map_response_body.rs

1//! Apply a transformation to the response body.
2//!
3//! # Example
4//!
5//! ```
6//! use bytes::Bytes;
7//! use rama_http::{Body, Request, Response};
8//! use rama_http::dep::http_body;
9//! use std::convert::Infallible;
10//! use std::{pin::Pin, task::{Context, Poll}};
11//! use rama_core::{Layer, Service, context};
12//! use rama_core::service::service_fn;
13//! use rama_http::layer::map_response_body::MapResponseBodyLayer;
14//! use rama_core::error::BoxError;
15//! use futures_lite::ready;
16//!
17//! // A wrapper for a `http_body::Body` that prints the size of data chunks
18//! pin_project_lite::pin_project! {
19//!     struct PrintChunkSizesBody<B> {
20//!         #[pin]
21//!         inner: B,
22//!     }
23//! }
24//!
25//! impl<B> PrintChunkSizesBody<B> {
26//!     fn new(inner: B) -> Self {
27//!         Self { inner }
28//!     }
29//! }
30//!
31//! impl<B> http_body::Body for PrintChunkSizesBody<B>
32//!     where B: http_body::Body<Data = Bytes, Error = BoxError>,
33//! {
34//!     type Data = Bytes;
35//!     type Error = BoxError;
36//!
37//!     fn poll_frame(
38//!         mut self: Pin<&mut Self>,
39//!         cx: &mut Context<'_>,
40//!     ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
41//!         let inner_body = self.as_mut().project().inner;
42//!         if let Some(frame) = ready!(inner_body.poll_frame(cx)?) {
43//!             if let Some(chunk) = frame.data_ref() {
44//!                 println!("chunk size = {}", chunk.len());
45//!             } else {
46//!                 eprintln!("no data chunk found");
47//!             }
48//!             Poll::Ready(Some(Ok(frame)))
49//!         } else {
50//!             Poll::Ready(None)
51//!         }
52//!     }
53//!
54//!     fn is_end_stream(&self) -> bool {
55//!         self.inner.is_end_stream()
56//!     }
57//!
58//!     fn size_hint(&self) -> http_body::SizeHint {
59//!         self.inner.size_hint()
60//!     }
61//! }
62//!
63//! async fn handle(_: Request) -> Result<Response, Infallible> {
64//!     // ...
65//!     # Ok(Response::new(Body::default()))
66//! }
67//!
68//! # #[tokio::main]
69//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
70//! let mut svc = (
71//!     // Wrap response bodies in `PrintChunkSizesBody`
72//!     MapResponseBodyLayer::new(PrintChunkSizesBody::new),
73//! ).into_layer(service_fn(handle));
74//!
75//! // Call the service
76//! let request = Request::new(Body::from("foobar"));
77//!
78//! svc.serve(context::Context::default(), request).await?;
79//! # Ok(())
80//! # }
81//! ```
82
83use crate::{Request, Response};
84use rama_core::{Context, Layer, Service};
85use rama_utils::macros::define_inner_service_accessors;
86use std::fmt;
87
88/// Apply a transformation to the response body.
89///
90/// See the [module docs](crate::layer::map_response_body) for an example.
91#[derive(Clone)]
92pub struct MapResponseBodyLayer<F> {
93    f: F,
94}
95
96impl<F> MapResponseBodyLayer<F> {
97    /// Create a new [`MapResponseBodyLayer`].
98    ///
99    /// `F` is expected to be a function that takes a body and returns another body.
100    pub const fn new(f: F) -> Self {
101        Self { f }
102    }
103}
104
105impl<S, F> Layer<S> for MapResponseBodyLayer<F>
106where
107    F: Clone,
108{
109    type Service = MapResponseBody<S, F>;
110
111    fn layer(&self, inner: S) -> Self::Service {
112        MapResponseBody::new(inner, self.f.clone())
113    }
114
115    fn into_layer(self, inner: S) -> Self::Service {
116        MapResponseBody::new(inner, self.f)
117    }
118}
119
120impl<F> fmt::Debug for MapResponseBodyLayer<F> {
121    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122        f.debug_struct("MapResponseBodyLayer")
123            .field("f", &std::any::type_name::<F>())
124            .finish()
125    }
126}
127
128/// Apply a transformation to the response body.
129///
130/// See the [module docs](crate::layer::map_response_body) for an example.
131#[derive(Clone)]
132pub struct MapResponseBody<S, F> {
133    inner: S,
134    f: F,
135}
136
137impl<S, F> MapResponseBody<S, F> {
138    /// Create a new [`MapResponseBody`].
139    ///
140    /// `F` is expected to be a function that takes a body and returns another body.
141    pub const fn new(service: S, f: F) -> Self {
142        Self { inner: service, f }
143    }
144
145    define_inner_service_accessors!();
146}
147
148impl<F, S, State, ReqBody, ResBody, NewResBody> Service<State, Request<ReqBody>>
149    for MapResponseBody<S, F>
150where
151    S: Service<State, Request<ReqBody>, Response = Response<ResBody>>,
152    State: Clone + Send + Sync + 'static,
153    ReqBody: Send + 'static,
154    ResBody: Send + Sync + 'static,
155    NewResBody: Send + Sync + 'static,
156    F: Fn(ResBody) -> NewResBody + Clone + Send + Sync + 'static,
157{
158    type Response = Response<NewResBody>;
159    type Error = S::Error;
160
161    async fn serve(
162        &self,
163        ctx: Context<State>,
164        req: Request<ReqBody>,
165    ) -> Result<Self::Response, Self::Error> {
166        let res = self.inner.serve(ctx, req).await?;
167        Ok(res.map(self.f.clone()))
168    }
169}
170
171impl<S, F> fmt::Debug for MapResponseBody<S, F>
172where
173    S: fmt::Debug,
174{
175    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176        f.debug_struct("MapResponseBody")
177            .field("inner", &self.inner)
178            .field("f", &std::any::type_name::<F>())
179            .finish()
180    }
181}