Skip to main content

libdd_common/
http_common.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use core::fmt;
5use std::convert::Infallible;
6
7use thiserror::Error;
8
9// --- Portable types (available on all platforms including wasm) ---
10
11#[derive(Debug, Clone, Copy)]
12pub enum ErrorKind {
13    Parse,
14    Closed,
15    Canceled,
16    Incomplete,
17    WriteAborted,
18    ParseStatus,
19    Timeout,
20    Other,
21}
22
23#[derive(Debug, Error)]
24pub struct ClientError {
25    source: anyhow::Error,
26    kind: ErrorKind,
27}
28
29impl std::fmt::Display for ClientError {
30    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
31        self.source.fmt(f)
32    }
33}
34
35impl ClientError {
36    pub fn kind(&self) -> ErrorKind {
37        self.kind
38    }
39}
40
41#[derive(Debug)]
42pub enum Error {
43    Client(ClientError),
44    Other(anyhow::Error),
45    Infallible(Infallible),
46}
47
48impl fmt::Display for Error {
49    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
50        match self {
51            Error::Client(e) => write!(f, "client error: {e}"),
52            Error::Infallible(e) => match *e {},
53            Error::Other(e) => write!(f, "other error: {e}"),
54        }
55    }
56}
57
58impl From<std::io::Error> for Error {
59    fn from(value: std::io::Error) -> Self {
60        Self::Other(value.into())
61    }
62}
63
64impl From<http::Error> for Error {
65    fn from(value: http::Error) -> Self {
66        Self::Other(value.into())
67    }
68}
69
70impl std::error::Error for Error {}
71
72// --- Native-only code (hyper, Body, client builders, etc.) ---
73
74#[cfg(not(target_arch = "wasm32"))]
75mod native {
76    use super::*;
77    use std::error::Error as _;
78    use std::task::Poll;
79
80    use crate::connector::Connector;
81    use http_body_util::BodyExt;
82    use hyper::body::Incoming;
83    use pin_project::pin_project;
84
85    impl From<hyper::Error> for ClientError {
86        fn from(source: hyper::Error) -> Self {
87            use ErrorKind::*;
88            let kind = if source.is_canceled() {
89                Canceled
90            } else if source.is_parse() {
91                Parse
92            } else if source.is_parse_status() {
93                ParseStatus
94            } else if source.is_incomplete_message() {
95                Incomplete
96            } else if source.is_body_write_aborted() {
97                WriteAborted
98            } else if source.is_timeout() {
99                Timeout
100            } else if source.is_closed() {
101                Closed
102            } else {
103                Other
104            };
105            Self {
106                kind,
107                source: source.into(),
108            }
109        }
110    }
111
112    pub type HttpResponse = http::Response<Body>;
113    pub type HttpRequest = http::Request<Body>;
114    pub type HttpRequestError = hyper_util::client::legacy::Error;
115
116    pub type ResponseFuture = hyper_util::client::legacy::ResponseFuture;
117
118    /// Create a new default configuration hyper client for fixed interval sending.
119    ///
120    /// This client does not keep connections because otherwise we would get a pipe closed
121    /// every second connection because of low keep alive in the agent.
122    ///
123    /// This is on general not a problem if we use the client once every tens of seconds.
124    pub fn new_client_periodic() -> GenericHttpClient<Connector> {
125        hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
126            .pool_max_idle_per_host(0)
127            .build(Connector::default())
128    }
129
130    /// Create a new default configuration hyper client.
131    ///
132    /// It will keep connections open for a longer time and reuse them.
133    pub fn new_default_client() -> GenericHttpClient<Connector> {
134        hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
135            .build(Connector::default())
136    }
137
138    pub fn into_response(response: hyper::Response<Incoming>) -> HttpResponse {
139        response.map(Body::Incoming)
140    }
141
142    impl From<HttpRequestError> for ClientError {
143        fn from(err: HttpRequestError) -> Self {
144            let kind = if let Some(source) = err.source().and_then(|s| s.downcast_ref::<Error>()) {
145                match source {
146                    Error::Client(client_error) => client_error.kind,
147                    Error::Other(_) => ErrorKind::Other,
148                    Error::Infallible(infallible) => match *infallible {},
149                }
150            } else if err.is_connect() {
151                ErrorKind::Closed
152            } else {
153                ErrorKind::Other
154            };
155            Self {
156                source: err.into(),
157                kind,
158            }
159        }
160    }
161
162    pub async fn collect_response_bytes(response: HttpResponse) -> Result<bytes::Bytes, Error> {
163        Ok(response.into_body().collect().await?.to_bytes())
164    }
165
166    pub fn mock_response(
167        builder: http::response::Builder,
168        body: hyper::body::Bytes,
169    ) -> anyhow::Result<HttpResponse> {
170        Ok(builder.body(Body::from_bytes(body))?)
171    }
172
173    pub fn empty_response(builder: http::response::Builder) -> Result<HttpResponse, Error> {
174        Ok(builder.body(Body::empty())?)
175    }
176
177    #[pin_project(project=BodyProj)]
178    #[derive(Debug)]
179    pub enum Body {
180        Single(#[pin] http_body_util::Full<hyper::body::Bytes>),
181        Empty(#[pin] http_body_util::Empty<hyper::body::Bytes>),
182        Boxed(#[pin] http_body_util::combinators::BoxBody<hyper::body::Bytes, anyhow::Error>),
183        Channel(#[pin] tokio::sync::mpsc::Receiver<hyper::body::Bytes>),
184        Incoming(#[pin] hyper::body::Incoming),
185    }
186
187    pub struct Sender {
188        tx: tokio::sync::mpsc::Sender<hyper::body::Bytes>,
189    }
190
191    impl Sender {
192        pub async fn send_data(&self, data: hyper::body::Bytes) -> anyhow::Result<()> {
193            self.tx.send(data).await?;
194            Ok(())
195        }
196    }
197
198    impl Body {
199        pub fn empty() -> Self {
200            Body::Empty(http_body_util::Empty::new())
201        }
202
203        pub fn from_bytes(bytes: hyper::body::Bytes) -> Self {
204            Body::Single(http_body_util::Full::new(bytes))
205        }
206
207        pub fn boxed<
208            E: std::error::Error + Sync + Send + 'static,
209            T: hyper::body::Body<Data = hyper::body::Bytes, Error = E> + Sync + Send + 'static,
210        >(
211            body: T,
212        ) -> Self {
213            Body::Boxed(body.map_err(anyhow::Error::from).boxed())
214        }
215
216        pub fn channel() -> (Sender, Self) {
217            let (tx, rx) = tokio::sync::mpsc::channel(1);
218            (Sender { tx }, Body::Channel(rx))
219        }
220
221        pub fn incoming(incoming: Incoming) -> Self {
222            Body::Incoming(incoming)
223        }
224    }
225
226    impl Default for Body {
227        fn default() -> Self {
228            Body::empty()
229        }
230    }
231
232    impl From<&'static str> for Body {
233        fn from(s: &'static str) -> Self {
234            Body::from_bytes(hyper::body::Bytes::from_static(s.as_bytes()))
235        }
236    }
237
238    impl From<Vec<u8>> for Body {
239        fn from(s: Vec<u8>) -> Self {
240            Body::from_bytes(hyper::body::Bytes::from(s))
241        }
242    }
243
244    impl From<String> for Body {
245        fn from(s: String) -> Self {
246            Body::from_bytes(hyper::body::Bytes::from(s))
247        }
248    }
249
250    impl hyper::body::Body for Body {
251        type Data = hyper::body::Bytes;
252
253        type Error = Error;
254
255        fn poll_frame(
256            self: std::pin::Pin<&mut Self>,
257            cx: &mut std::task::Context<'_>,
258        ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
259            match self.project() {
260                BodyProj::Single(pin) => pin.poll_frame(cx).map_err(Error::Infallible),
261                BodyProj::Empty(pin) => pin.poll_frame(cx).map_err(Error::Infallible),
262                BodyProj::Boxed(pin) => pin.poll_frame(cx).map_err(Error::Other),
263                BodyProj::Channel(pin) => {
264                    let data = match pin.get_mut().poll_recv(cx) {
265                        Poll::Ready(Some(data)) => data,
266                        Poll::Ready(None) => return Poll::Ready(None),
267                        Poll::Pending => return Poll::Pending,
268                    };
269                    Poll::Ready(Some(Ok(hyper::body::Frame::data(data))))
270                }
271                BodyProj::Incoming(pin) => pin
272                    .poll_frame(cx)
273                    .map_err(|e| Error::Client(ClientError::from(e))),
274            }
275        }
276
277        fn is_end_stream(&self) -> bool {
278            match self {
279                Body::Single(body) => body.is_end_stream(),
280                Body::Empty(body) => body.is_end_stream(),
281                Body::Boxed(body) => body.is_end_stream(),
282                Body::Channel(body) => body.is_closed() && body.is_empty(),
283                Body::Incoming(body) => body.is_end_stream(),
284            }
285        }
286
287        fn size_hint(&self) -> http_body::SizeHint {
288            match self {
289                Body::Single(body) => body.size_hint(),
290                Body::Empty(body) => body.size_hint(),
291                Body::Boxed(body) => body.size_hint(),
292                Body::Channel(_) => http_body::SizeHint::default(),
293                Body::Incoming(body) => body.size_hint(),
294            }
295        }
296    }
297
298    pub type GenericHttpClient<C> = hyper_util::client::legacy::Client<C, Body>;
299
300    pub fn client_builder() -> hyper_util::client::legacy::Builder {
301        hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
302    }
303}
304
305#[cfg(not(target_arch = "wasm32"))]
306pub use native::*;