libdd_common/
http_common.rs1use core::fmt;
5use std::{convert::Infallible, error::Error as _, task::Poll};
6
7use crate::connector::Connector;
8use http_body_util::BodyExt;
9use hyper::body::Incoming;
10use pin_project::pin_project;
11use thiserror::Error;
12
13pub fn new_client_periodic() -> GenericHttpClient<Connector> {
20 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
21 .pool_max_idle_per_host(0)
22 .build(Connector::default())
23}
24
25pub fn new_default_client() -> GenericHttpClient<Connector> {
29 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
30 .build(Connector::default())
31}
32
33#[derive(Debug, Clone, Copy)]
34pub enum ErrorKind {
35 Parse,
36 Closed,
37 Canceled,
38 Incomplete,
39 WriteAborted,
40 ParseStatus,
41 Timeout,
42 Other,
43}
44
45#[derive(Debug, Error)]
46pub struct ClientError {
47 source: anyhow::Error,
48 kind: ErrorKind,
49}
50
51impl std::fmt::Display for ClientError {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 self.source.fmt(f)
54 }
55}
56
57impl ClientError {
58 pub fn kind(&self) -> ErrorKind {
59 self.kind
60 }
61}
62
63impl From<hyper::Error> for ClientError {
64 fn from(source: hyper::Error) -> Self {
65 use ErrorKind::*;
66 let kind = if source.is_canceled() {
67 Canceled
68 } else if source.is_parse() {
69 Parse
70 } else if source.is_parse_status() {
71 ParseStatus
72 } else if source.is_incomplete_message() {
73 Incomplete
74 } else if source.is_body_write_aborted() {
75 WriteAborted
76 } else if source.is_timeout() {
77 Timeout
78 } else if source.is_closed() {
79 Closed
80 } else {
81 Other
82 };
83 Self {
84 kind,
85 source: source.into(),
86 }
87 }
88}
89
90pub type HttpResponse = http::Response<Body>;
91pub type HttpRequest = http::Request<Body>;
92pub type HttpRequestError = hyper_util::client::legacy::Error;
93
94pub type ResponseFuture = hyper_util::client::legacy::ResponseFuture;
95
96pub fn into_response(response: hyper::Response<Incoming>) -> HttpResponse {
97 response.map(Body::Incoming)
98}
99
100pub fn into_error(err: HttpRequestError) -> ClientError {
101 let kind = if let Some(source) = err.source().and_then(|s| s.downcast_ref::<Error>()) {
102 match source {
103 Error::Client(client_error) => client_error.kind,
104 Error::Other(_) => ErrorKind::Other,
105 Error::Infallible(infallible) => match *infallible {},
106 }
107 } else if err.is_connect() {
108 ErrorKind::Closed
109 } else {
110 ErrorKind::Other
111 };
112 ClientError {
113 source: err.into(),
114 kind,
115 }
116}
117
118pub async fn collect_response_bytes(response: HttpResponse) -> Result<bytes::Bytes, Error> {
119 Ok(response.into_body().collect().await?.to_bytes())
120}
121
122#[derive(Debug)]
123pub enum Error {
124 Client(ClientError),
125 Other(anyhow::Error),
126 Infallible(Infallible),
127}
128
129impl fmt::Display for Error {
130 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131 match self {
132 Error::Client(e) => write!(f, "client error: {e}"),
133 Error::Infallible(e) => match *e {},
134 Error::Other(e) => write!(f, "other error: {e}"),
135 }
136 }
137}
138
139impl From<std::io::Error> for Error {
140 fn from(value: std::io::Error) -> Self {
141 Self::Other(value.into())
142 }
143}
144
145impl From<http::Error> for Error {
146 fn from(value: http::Error) -> Self {
147 Self::Other(value.into())
148 }
149}
150
151impl std::error::Error for Error {}
152
153pub fn mock_response(
154 builder: http::response::Builder,
155 body: hyper::body::Bytes,
156) -> anyhow::Result<HttpResponse> {
157 Ok(builder.body(Body::from_bytes(body))?)
158}
159
160pub fn empty_response(builder: http::response::Builder) -> Result<HttpResponse, Error> {
161 Ok(builder.body(Body::empty())?)
162}
163
164#[pin_project(project=BodyProj)]
165#[derive(Debug)]
166pub enum Body {
167 Single(#[pin] http_body_util::Full<hyper::body::Bytes>),
168 Empty(#[pin] http_body_util::Empty<hyper::body::Bytes>),
169 Boxed(#[pin] http_body_util::combinators::BoxBody<hyper::body::Bytes, anyhow::Error>),
170 Channel(#[pin] tokio::sync::mpsc::Receiver<hyper::body::Bytes>),
171 Incoming(#[pin] hyper::body::Incoming),
172}
173
174pub struct Sender {
175 tx: tokio::sync::mpsc::Sender<hyper::body::Bytes>,
176}
177
178impl Sender {
179 pub async fn send_data(&self, data: hyper::body::Bytes) -> anyhow::Result<()> {
180 self.tx.send(data).await?;
181 Ok(())
182 }
183}
184
185impl Body {
186 pub fn empty() -> Self {
187 Body::Empty(http_body_util::Empty::new())
188 }
189
190 pub fn from_bytes(bytes: hyper::body::Bytes) -> Self {
191 Body::Single(http_body_util::Full::new(bytes))
192 }
193
194 pub fn boxed<
195 E: std::error::Error + Sync + Send + 'static,
196 T: hyper::body::Body<Data = hyper::body::Bytes, Error = E> + Sync + Send + 'static,
197 >(
198 body: T,
199 ) -> Self {
200 Body::Boxed(body.map_err(anyhow::Error::from).boxed())
201 }
202
203 pub fn channel() -> (Sender, Self) {
204 let (tx, rx) = tokio::sync::mpsc::channel(1);
205 (Sender { tx }, Body::Channel(rx))
206 }
207
208 pub fn incoming(incoming: Incoming) -> Self {
209 Body::Incoming(incoming)
210 }
211}
212
213impl Default for Body {
214 fn default() -> Self {
215 Body::empty()
216 }
217}
218
219impl From<&'static str> for Body {
220 fn from(s: &'static str) -> Self {
221 Body::from_bytes(hyper::body::Bytes::from_static(s.as_bytes()))
222 }
223}
224
225impl From<Vec<u8>> for Body {
226 fn from(s: Vec<u8>) -> Self {
227 Body::from_bytes(hyper::body::Bytes::from(s))
228 }
229}
230
231impl From<String> for Body {
232 fn from(s: String) -> Self {
233 Body::from_bytes(hyper::body::Bytes::from(s))
234 }
235}
236
237impl hyper::body::Body for Body {
238 type Data = hyper::body::Bytes;
239
240 type Error = Error;
241
242 fn poll_frame(
243 self: std::pin::Pin<&mut Self>,
244 cx: &mut std::task::Context<'_>,
245 ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
246 match self.project() {
247 BodyProj::Single(pin) => pin.poll_frame(cx).map_err(Error::Infallible),
248 BodyProj::Empty(pin) => pin.poll_frame(cx).map_err(Error::Infallible),
249 BodyProj::Boxed(pin) => pin.poll_frame(cx).map_err(Error::Other),
250 BodyProj::Channel(pin) => {
251 let data = match pin.get_mut().poll_recv(cx) {
252 Poll::Ready(Some(data)) => data,
253 Poll::Ready(None) => return Poll::Ready(None),
254 Poll::Pending => return Poll::Pending,
255 };
256 Poll::Ready(Some(Ok(hyper::body::Frame::data(data))))
257 }
258 BodyProj::Incoming(pin) => pin
259 .poll_frame(cx)
260 .map_err(|e| Error::Client(ClientError::from(e))),
261 }
262 }
263
264 fn is_end_stream(&self) -> bool {
265 match self {
266 Body::Single(body) => body.is_end_stream(),
267 Body::Empty(body) => body.is_end_stream(),
268 Body::Boxed(body) => body.is_end_stream(),
269 Body::Channel(body) => body.is_closed() && body.is_empty(),
270 Body::Incoming(body) => body.is_end_stream(),
271 }
272 }
273
274 fn size_hint(&self) -> http_body::SizeHint {
275 match self {
276 Body::Single(body) => body.size_hint(),
277 Body::Empty(body) => body.size_hint(),
278 Body::Boxed(body) => body.size_hint(),
279 Body::Channel(_) => http_body::SizeHint::default(),
280 Body::Incoming(body) => body.size_hint(),
281 }
282 }
283}
284
285pub type GenericHttpClient<C> = hyper_util::client::legacy::Client<C, Body>;
286
287pub fn client_builder() -> hyper_util::client::legacy::Builder {
288 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
289}