rama_http/layer/map_response_body.rs
1//! Apply a transformation to the response body.
2//!
3//! # Example
4//!
5//! ```
6//! use bytes::Bytes;
7//! use rama_http::{Body, Request, Response};
8//! use rama_http::dep::http_body;
9//! use std::convert::Infallible;
10//! use std::{pin::Pin, task::{Context, Poll}};
11//! use rama_core::{Layer, Service, context};
12//! use rama_core::service::service_fn;
13//! use rama_http::layer::map_response_body::MapResponseBodyLayer;
14//! use rama_core::error::BoxError;
15//! use futures_lite::ready;
16//!
17//! // A wrapper for a `http_body::Body` that prints the size of data chunks
18//! pin_project_lite::pin_project! {
19//! struct PrintChunkSizesBody<B> {
20//! #[pin]
21//! inner: B,
22//! }
23//! }
24//!
25//! impl<B> PrintChunkSizesBody<B> {
26//! fn new(inner: B) -> Self {
27//! Self { inner }
28//! }
29//! }
30//!
31//! impl<B> http_body::Body for PrintChunkSizesBody<B>
32//! where B: http_body::Body<Data = Bytes, Error = BoxError>,
33//! {
34//! type Data = Bytes;
35//! type Error = BoxError;
36//!
37//! fn poll_frame(
38//! mut self: Pin<&mut Self>,
39//! cx: &mut Context<'_>,
40//! ) -> Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
41//! let inner_body = self.as_mut().project().inner;
42//! if let Some(frame) = ready!(inner_body.poll_frame(cx)?) {
43//! if let Some(chunk) = frame.data_ref() {
44//! println!("chunk size = {}", chunk.len());
45//! } else {
46//! eprintln!("no data chunk found");
47//! }
48//! Poll::Ready(Some(Ok(frame)))
49//! } else {
50//! Poll::Ready(None)
51//! }
52//! }
53//!
54//! fn is_end_stream(&self) -> bool {
55//! self.inner.is_end_stream()
56//! }
57//!
58//! fn size_hint(&self) -> http_body::SizeHint {
59//! self.inner.size_hint()
60//! }
61//! }
62//!
63//! async fn handle(_: Request) -> Result<Response, Infallible> {
64//! // ...
65//! # Ok(Response::new(Body::default()))
66//! }
67//!
68//! # #[tokio::main]
69//! # async fn main() -> Result<(), Box<dyn std::error::Error>> {
70//! let mut svc = (
71//! // Wrap response bodies in `PrintChunkSizesBody`
72//! MapResponseBodyLayer::new(PrintChunkSizesBody::new),
73//! ).into_layer(service_fn(handle));
74//!
75//! // Call the service
76//! let request = Request::new(Body::from("foobar"));
77//!
78//! svc.serve(context::Context::default(), request).await?;
79//! # Ok(())
80//! # }
81//! ```
82
83use crate::{Request, Response};
84use rama_core::{Context, Layer, Service};
85use rama_utils::macros::define_inner_service_accessors;
86use std::fmt;
87
88/// Apply a transformation to the response body.
89///
90/// See the [module docs](crate::layer::map_response_body) for an example.
91#[derive(Clone)]
92pub struct MapResponseBodyLayer<F> {
93 f: F,
94}
95
96impl<F> MapResponseBodyLayer<F> {
97 /// Create a new [`MapResponseBodyLayer`].
98 ///
99 /// `F` is expected to be a function that takes a body and returns another body.
100 pub const fn new(f: F) -> Self {
101 Self { f }
102 }
103}
104
105impl<S, F> Layer<S> for MapResponseBodyLayer<F>
106where
107 F: Clone,
108{
109 type Service = MapResponseBody<S, F>;
110
111 fn layer(&self, inner: S) -> Self::Service {
112 MapResponseBody::new(inner, self.f.clone())
113 }
114
115 fn into_layer(self, inner: S) -> Self::Service {
116 MapResponseBody::new(inner, self.f)
117 }
118}
119
120impl<F> fmt::Debug for MapResponseBodyLayer<F> {
121 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
122 f.debug_struct("MapResponseBodyLayer")
123 .field("f", &std::any::type_name::<F>())
124 .finish()
125 }
126}
127
128/// Apply a transformation to the response body.
129///
130/// See the [module docs](crate::layer::map_response_body) for an example.
131#[derive(Clone)]
132pub struct MapResponseBody<S, F> {
133 inner: S,
134 f: F,
135}
136
137impl<S, F> MapResponseBody<S, F> {
138 /// Create a new [`MapResponseBody`].
139 ///
140 /// `F` is expected to be a function that takes a body and returns another body.
141 pub const fn new(service: S, f: F) -> Self {
142 Self { inner: service, f }
143 }
144
145 define_inner_service_accessors!();
146}
147
148impl<F, S, State, ReqBody, ResBody, NewResBody> Service<State, Request<ReqBody>>
149 for MapResponseBody<S, F>
150where
151 S: Service<State, Request<ReqBody>, Response = Response<ResBody>>,
152 State: Clone + Send + Sync + 'static,
153 ReqBody: Send + 'static,
154 ResBody: Send + Sync + 'static,
155 NewResBody: Send + Sync + 'static,
156 F: Fn(ResBody) -> NewResBody + Clone + Send + Sync + 'static,
157{
158 type Response = Response<NewResBody>;
159 type Error = S::Error;
160
161 async fn serve(
162 &self,
163 ctx: Context<State>,
164 req: Request<ReqBody>,
165 ) -> Result<Self::Response, Self::Error> {
166 let res = self.inner.serve(ctx, req).await?;
167 Ok(res.map(self.f.clone()))
168 }
169}
170
171impl<S, F> fmt::Debug for MapResponseBody<S, F>
172where
173 S: fmt::Debug,
174{
175 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
176 f.debug_struct("MapResponseBody")
177 .field("inner", &self.inner)
178 .field("f", &std::any::type_name::<F>())
179 .finish()
180 }
181}