libdd_common/
http_common.rs1use core::fmt;
5use std::convert::Infallible;
6
7use thiserror::Error;
8
9#[derive(Debug, Clone, Copy)]
12pub enum ErrorKind {
13 Parse,
14 Closed,
15 Canceled,
16 Incomplete,
17 WriteAborted,
18 ParseStatus,
19 Timeout,
20 Other,
21}
22
23#[derive(Debug, Error)]
24pub struct ClientError {
25 source: anyhow::Error,
26 kind: ErrorKind,
27}
28
29impl std::fmt::Display for ClientError {
30 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31 self.source.fmt(f)
32 }
33}
34
35impl ClientError {
36 pub fn kind(&self) -> ErrorKind {
37 self.kind
38 }
39}
40
41#[derive(Debug)]
42pub enum Error {
43 Client(ClientError),
44 Other(anyhow::Error),
45 Infallible(Infallible),
46}
47
48impl fmt::Display for Error {
49 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50 match self {
51 Error::Client(e) => write!(f, "client error: {e}"),
52 Error::Infallible(e) => match *e {},
53 Error::Other(e) => write!(f, "other error: {e}"),
54 }
55 }
56}
57
58impl From<std::io::Error> for Error {
59 fn from(value: std::io::Error) -> Self {
60 Self::Other(value.into())
61 }
62}
63
64impl From<http::Error> for Error {
65 fn from(value: http::Error) -> Self {
66 Self::Other(value.into())
67 }
68}
69
70impl std::error::Error for Error {}
71
72#[cfg(not(target_arch = "wasm32"))]
75mod native {
76 use super::*;
77 use std::error::Error as _;
78 use std::task::Poll;
79
80 use crate::connector::Connector;
81 use http_body_util::BodyExt;
82 use hyper::body::Incoming;
83 use pin_project::pin_project;
84
85 impl From<hyper::Error> for ClientError {
86 fn from(source: hyper::Error) -> Self {
87 use ErrorKind::*;
88 let kind = if source.is_canceled() {
89 Canceled
90 } else if source.is_parse() {
91 Parse
92 } else if source.is_parse_status() {
93 ParseStatus
94 } else if source.is_incomplete_message() {
95 Incomplete
96 } else if source.is_body_write_aborted() {
97 WriteAborted
98 } else if source.is_timeout() {
99 Timeout
100 } else if source.is_closed() {
101 Closed
102 } else {
103 Other
104 };
105 Self {
106 kind,
107 source: source.into(),
108 }
109 }
110 }
111
112 pub type HttpResponse = http::Response<Body>;
113 pub type HttpRequest = http::Request<Body>;
114 pub type HttpRequestError = hyper_util::client::legacy::Error;
115
116 pub type ResponseFuture = hyper_util::client::legacy::ResponseFuture;
117
118 pub fn new_client_periodic() -> GenericHttpClient<Connector> {
125 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
126 .pool_max_idle_per_host(0)
127 .build(Connector::default())
128 }
129
130 pub fn new_default_client() -> GenericHttpClient<Connector> {
134 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
135 .build(Connector::default())
136 }
137
138 pub fn into_response(response: hyper::Response<Incoming>) -> HttpResponse {
139 response.map(Body::Incoming)
140 }
141
142 impl From<HttpRequestError> for ClientError {
143 fn from(err: HttpRequestError) -> Self {
144 let kind = if let Some(source) = err.source().and_then(|s| s.downcast_ref::<Error>()) {
145 match source {
146 Error::Client(client_error) => client_error.kind,
147 Error::Other(_) => ErrorKind::Other,
148 Error::Infallible(infallible) => match *infallible {},
149 }
150 } else if err.is_connect() {
151 ErrorKind::Closed
152 } else {
153 ErrorKind::Other
154 };
155 Self {
156 source: err.into(),
157 kind,
158 }
159 }
160 }
161
162 pub async fn collect_response_bytes(response: HttpResponse) -> Result<bytes::Bytes, Error> {
163 Ok(response.into_body().collect().await?.to_bytes())
164 }
165
166 pub fn mock_response(
167 builder: http::response::Builder,
168 body: hyper::body::Bytes,
169 ) -> anyhow::Result<HttpResponse> {
170 Ok(builder.body(Body::from_bytes(body))?)
171 }
172
173 pub fn empty_response(builder: http::response::Builder) -> Result<HttpResponse, Error> {
174 Ok(builder.body(Body::empty())?)
175 }
176
177 #[pin_project(project=BodyProj)]
178 #[derive(Debug)]
179 pub enum Body {
180 Single(#[pin] http_body_util::Full<hyper::body::Bytes>),
181 Empty(#[pin] http_body_util::Empty<hyper::body::Bytes>),
182 Boxed(#[pin] http_body_util::combinators::BoxBody<hyper::body::Bytes, anyhow::Error>),
183 Channel(#[pin] tokio::sync::mpsc::Receiver<hyper::body::Bytes>),
184 Incoming(#[pin] hyper::body::Incoming),
185 }
186
187 pub struct Sender {
188 tx: tokio::sync::mpsc::Sender<hyper::body::Bytes>,
189 }
190
191 impl Sender {
192 pub async fn send_data(&self, data: hyper::body::Bytes) -> anyhow::Result<()> {
193 self.tx.send(data).await?;
194 Ok(())
195 }
196 }
197
198 impl Body {
199 pub fn empty() -> Self {
200 Body::Empty(http_body_util::Empty::new())
201 }
202
203 pub fn from_bytes(bytes: hyper::body::Bytes) -> Self {
204 Body::Single(http_body_util::Full::new(bytes))
205 }
206
207 pub fn boxed<
208 E: std::error::Error + Sync + Send + 'static,
209 T: hyper::body::Body<Data = hyper::body::Bytes, Error = E> + Sync + Send + 'static,
210 >(
211 body: T,
212 ) -> Self {
213 Body::Boxed(body.map_err(anyhow::Error::from).boxed())
214 }
215
216 pub fn channel() -> (Sender, Self) {
217 let (tx, rx) = tokio::sync::mpsc::channel(1);
218 (Sender { tx }, Body::Channel(rx))
219 }
220
221 pub fn incoming(incoming: Incoming) -> Self {
222 Body::Incoming(incoming)
223 }
224 }
225
226 impl Default for Body {
227 fn default() -> Self {
228 Body::empty()
229 }
230 }
231
232 impl From<&'static str> for Body {
233 fn from(s: &'static str) -> Self {
234 Body::from_bytes(hyper::body::Bytes::from_static(s.as_bytes()))
235 }
236 }
237
238 impl From<Vec<u8>> for Body {
239 fn from(s: Vec<u8>) -> Self {
240 Body::from_bytes(hyper::body::Bytes::from(s))
241 }
242 }
243
244 impl From<String> for Body {
245 fn from(s: String) -> Self {
246 Body::from_bytes(hyper::body::Bytes::from(s))
247 }
248 }
249
250 impl hyper::body::Body for Body {
251 type Data = hyper::body::Bytes;
252
253 type Error = Error;
254
255 fn poll_frame(
256 self: std::pin::Pin<&mut Self>,
257 cx: &mut std::task::Context<'_>,
258 ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
259 match self.project() {
260 BodyProj::Single(pin) => pin.poll_frame(cx).map_err(Error::Infallible),
261 BodyProj::Empty(pin) => pin.poll_frame(cx).map_err(Error::Infallible),
262 BodyProj::Boxed(pin) => pin.poll_frame(cx).map_err(Error::Other),
263 BodyProj::Channel(pin) => {
264 let data = match pin.get_mut().poll_recv(cx) {
265 Poll::Ready(Some(data)) => data,
266 Poll::Ready(None) => return Poll::Ready(None),
267 Poll::Pending => return Poll::Pending,
268 };
269 Poll::Ready(Some(Ok(hyper::body::Frame::data(data))))
270 }
271 BodyProj::Incoming(pin) => pin
272 .poll_frame(cx)
273 .map_err(|e| Error::Client(ClientError::from(e))),
274 }
275 }
276
277 fn is_end_stream(&self) -> bool {
278 match self {
279 Body::Single(body) => body.is_end_stream(),
280 Body::Empty(body) => body.is_end_stream(),
281 Body::Boxed(body) => body.is_end_stream(),
282 Body::Channel(body) => body.is_closed() && body.is_empty(),
283 Body::Incoming(body) => body.is_end_stream(),
284 }
285 }
286
287 fn size_hint(&self) -> http_body::SizeHint {
288 match self {
289 Body::Single(body) => body.size_hint(),
290 Body::Empty(body) => body.size_hint(),
291 Body::Boxed(body) => body.size_hint(),
292 Body::Channel(_) => http_body::SizeHint::default(),
293 Body::Incoming(body) => body.size_hint(),
294 }
295 }
296 }
297
298 pub type GenericHttpClient<C> = hyper_util::client::legacy::Client<C, Body>;
299
300 pub fn client_builder() -> hyper_util::client::legacy::Builder {
301 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
302 }
303}
304
305#[cfg(not(target_arch = "wasm32"))]
306pub use native::*;