lambda_runtime_api_client/body/
mod.rs

1//! HTTP body utilities. Extracted from Axum under MIT license.
2//! <https://github.com/tokio-rs/axum/blob/main/axum/LICENSE>
3
4use crate::{BoxError, Error};
5use bytes::Bytes;
6use futures_util::stream::Stream;
7use http_body::{Body as _, Frame};
8use http_body_util::{BodyExt, Collected};
9use std::{
10    pin::Pin,
11    task::{Context, Poll},
12};
13
14use self::channel::Sender;
15
16macro_rules! ready {
17    ($e:expr) => {
18        match $e {
19            std::task::Poll::Ready(v) => v,
20            std::task::Poll::Pending => return std::task::Poll::Pending,
21        }
22    };
23}
24
25mod channel;
26pub mod sender;
27mod watch;
28
29type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
30
31fn boxed<B>(body: B) -> BoxBody
32where
33    B: http_body::Body<Data = Bytes> + Send + 'static,
34    B::Error: Into<BoxError>,
35{
36    try_downcast(body).unwrap_or_else(|body| body.map_err(Error::new).boxed_unsync())
37}
38
39pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
40where
41    T: 'static,
42    K: Send + 'static,
43{
44    let mut k = Some(k);
45    if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
46        Ok(k.take().unwrap())
47    } else {
48        Err(k.unwrap())
49    }
50}
51
52/// The body type used in axum requests and responses.
53#[derive(Debug)]
54pub struct Body(BoxBody);
55
56impl Body {
57    /// Create a new `Body` that wraps another [`http_body::Body`].
58    pub fn new<B>(body: B) -> Self
59    where
60        B: http_body::Body<Data = Bytes> + Send + 'static,
61        B::Error: Into<BoxError>,
62    {
63        try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
64    }
65
66    /// Create an empty body.
67    pub fn empty() -> Self {
68        Self::new(http_body_util::Empty::new())
69    }
70
71    /// Create a new `Body` stream with associated Sender half.
72    pub fn channel() -> (Sender, Body) {
73        let (sender, body) = channel::channel();
74        (sender, Body::new(body))
75    }
76
77    /// Collect the body into `Bytes`
78    pub async fn collect(self) -> Result<Collected<Bytes>, Error> {
79        self.0.collect().await
80    }
81}
82
83impl Default for Body {
84    fn default() -> Self {
85        Self::empty()
86    }
87}
88
89macro_rules! body_from_impl {
90    ($ty:ty) => {
91        impl From<$ty> for Body {
92            fn from(buf: $ty) -> Self {
93                Self::new(http_body_util::Full::from(buf))
94            }
95        }
96    };
97}
98
99body_from_impl!(&'static [u8]);
100body_from_impl!(std::borrow::Cow<'static, [u8]>);
101body_from_impl!(Vec<u8>);
102
103body_from_impl!(&'static str);
104body_from_impl!(std::borrow::Cow<'static, str>);
105body_from_impl!(String);
106
107body_from_impl!(Bytes);
108
109impl http_body::Body for Body {
110    type Data = Bytes;
111    type Error = Error;
112
113    #[inline]
114    fn poll_frame(
115        mut self: Pin<&mut Self>,
116        cx: &mut Context<'_>,
117    ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
118        Pin::new(&mut self.0).poll_frame(cx)
119    }
120
121    #[inline]
122    fn size_hint(&self) -> http_body::SizeHint {
123        self.0.size_hint()
124    }
125
126    #[inline]
127    fn is_end_stream(&self) -> bool {
128        self.0.is_end_stream()
129    }
130}
131
132impl Stream for Body {
133    type Item = Result<Bytes, Error>;
134
135    #[inline]
136    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137        match futures_util::ready!(Pin::new(&mut self).poll_frame(cx)?) {
138            Some(frame) => match frame.into_data() {
139                Ok(data) => Poll::Ready(Some(Ok(data))),
140                Err(_frame) => Poll::Ready(None),
141            },
142            None => Poll::Ready(None),
143        }
144    }
145}