lambda_runtime_api_client/body/
mod.rs1use crate::{BoxError, Error};
5use bytes::Bytes;
6use futures_util::stream::Stream;
7use http_body::{Body as _, Frame};
8use http_body_util::{BodyExt, Collected};
9use std::{
10 pin::Pin,
11 task::{Context, Poll},
12};
13
14use self::channel::Sender;
15
16macro_rules! ready {
17 ($e:expr) => {
18 match $e {
19 std::task::Poll::Ready(v) => v,
20 std::task::Poll::Pending => return std::task::Poll::Pending,
21 }
22 };
23}
24
25mod channel;
26pub mod sender;
27mod watch;
28
29type BoxBody = http_body_util::combinators::UnsyncBoxBody<Bytes, Error>;
30
31fn boxed<B>(body: B) -> BoxBody
32where
33 B: http_body::Body<Data = Bytes> + Send + 'static,
34 B::Error: Into<BoxError>,
35{
36 try_downcast(body).unwrap_or_else(|body| body.map_err(Error::new).boxed_unsync())
37}
38
39pub(crate) fn try_downcast<T, K>(k: K) -> Result<T, K>
40where
41 T: 'static,
42 K: Send + 'static,
43{
44 let mut k = Some(k);
45 if let Some(k) = <dyn std::any::Any>::downcast_mut::<Option<T>>(&mut k) {
46 Ok(k.take().unwrap())
47 } else {
48 Err(k.unwrap())
49 }
50}
51
52#[derive(Debug)]
54pub struct Body(BoxBody);
55
56impl Body {
57 pub fn new<B>(body: B) -> Self
59 where
60 B: http_body::Body<Data = Bytes> + Send + 'static,
61 B::Error: Into<BoxError>,
62 {
63 try_downcast(body).unwrap_or_else(|body| Self(boxed(body)))
64 }
65
66 pub fn empty() -> Self {
68 Self::new(http_body_util::Empty::new())
69 }
70
71 pub fn channel() -> (Sender, Body) {
73 let (sender, body) = channel::channel();
74 (sender, Body::new(body))
75 }
76
77 pub async fn collect(self) -> Result<Collected<Bytes>, Error> {
79 self.0.collect().await
80 }
81}
82
83impl Default for Body {
84 fn default() -> Self {
85 Self::empty()
86 }
87}
88
89macro_rules! body_from_impl {
90 ($ty:ty) => {
91 impl From<$ty> for Body {
92 fn from(buf: $ty) -> Self {
93 Self::new(http_body_util::Full::from(buf))
94 }
95 }
96 };
97}
98
99body_from_impl!(&'static [u8]);
100body_from_impl!(std::borrow::Cow<'static, [u8]>);
101body_from_impl!(Vec<u8>);
102
103body_from_impl!(&'static str);
104body_from_impl!(std::borrow::Cow<'static, str>);
105body_from_impl!(String);
106
107body_from_impl!(Bytes);
108
109impl http_body::Body for Body {
110 type Data = Bytes;
111 type Error = Error;
112
113 #[inline]
114 fn poll_frame(
115 mut self: Pin<&mut Self>,
116 cx: &mut Context<'_>,
117 ) -> Poll<Option<Result<Frame<Self::Data>, Self::Error>>> {
118 Pin::new(&mut self.0).poll_frame(cx)
119 }
120
121 #[inline]
122 fn size_hint(&self) -> http_body::SizeHint {
123 self.0.size_hint()
124 }
125
126 #[inline]
127 fn is_end_stream(&self) -> bool {
128 self.0.is_end_stream()
129 }
130}
131
132impl Stream for Body {
133 type Item = Result<Bytes, Error>;
134
135 #[inline]
136 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
137 match futures_util::ready!(Pin::new(&mut self).poll_frame(cx)?) {
138 Some(frame) => match frame.into_data() {
139 Ok(data) => Poll::Ready(Some(Ok(data))),
140 Err(_frame) => Poll::Ready(None),
141 },
142 None => Poll::Ready(None),
143 }
144 }
145}