Skip to main content

libdd_common/
http_common.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use core::fmt;
5use std::{convert::Infallible, error::Error as _, task::Poll};
6
7use crate::connector::Connector;
8use http_body_util::BodyExt;
9use hyper::body::Incoming;
10use pin_project::pin_project;
11use thiserror::Error;
12
13/// Create a new default configuration hyper client for fixed interval sending.
14///
15/// This client does not keep connections because otherwise we would get a pipe closed
16/// every second connection because of low keep alive in the agent.
17///
18/// This is on general not a problem if we use the client once every tens of seconds.
19pub fn new_client_periodic() -> GenericHttpClient<Connector> {
20    hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
21        .pool_max_idle_per_host(0)
22        .build(Connector::default())
23}
24
25/// Create a new default configuration hyper client.
26///
27/// It will keep connections open for a longer time and reuse them.
28pub fn new_default_client() -> GenericHttpClient<Connector> {
29    hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
30        .build(Connector::default())
31}
32
33#[derive(Debug, Clone, Copy)]
34pub enum ErrorKind {
35    Parse,
36    Closed,
37    Canceled,
38    Incomplete,
39    WriteAborted,
40    ParseStatus,
41    Timeout,
42    Other,
43}
44
45#[derive(Debug, Error)]
46pub struct ClientError {
47    source: anyhow::Error,
48    kind: ErrorKind,
49}
50
51impl std::fmt::Display for ClientError {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        self.source.fmt(f)
54    }
55}
56
57impl ClientError {
58    pub fn kind(&self) -> ErrorKind {
59        self.kind
60    }
61}
62
63impl From<hyper::Error> for ClientError {
64    fn from(source: hyper::Error) -> Self {
65        use ErrorKind::*;
66        let kind = if source.is_canceled() {
67            Canceled
68        } else if source.is_parse() {
69            Parse
70        } else if source.is_parse_status() {
71            ParseStatus
72        } else if source.is_incomplete_message() {
73            Incomplete
74        } else if source.is_body_write_aborted() {
75            WriteAborted
76        } else if source.is_timeout() {
77            Timeout
78        } else if source.is_closed() {
79            Closed
80        } else {
81            Other
82        };
83        Self {
84            kind,
85            source: source.into(),
86        }
87    }
88}
89
90pub type HttpResponse = http::Response<Body>;
91pub type HttpRequest = http::Request<Body>;
92pub type HttpRequestError = hyper_util::client::legacy::Error;
93
94pub type ResponseFuture = hyper_util::client::legacy::ResponseFuture;
95
96pub fn into_response(response: hyper::Response<Incoming>) -> HttpResponse {
97    response.map(Body::Incoming)
98}
99
100pub fn into_error(err: HttpRequestError) -> ClientError {
101    let kind = if let Some(source) = err.source().and_then(|s| s.downcast_ref::<Error>()) {
102        match source {
103            Error::Client(client_error) => client_error.kind,
104            Error::Other(_) => ErrorKind::Other,
105            Error::Infallible(infallible) => match *infallible {},
106        }
107    } else if err.is_connect() {
108        ErrorKind::Closed
109    } else {
110        ErrorKind::Other
111    };
112    ClientError {
113        source: err.into(),
114        kind,
115    }
116}
117
118pub async fn collect_response_bytes(response: HttpResponse) -> Result<bytes::Bytes, Error> {
119    Ok(response.into_body().collect().await?.to_bytes())
120}
121
122#[derive(Debug)]
123pub enum Error {
124    Client(ClientError),
125    Other(anyhow::Error),
126    Infallible(Infallible),
127}
128
129impl fmt::Display for Error {
130    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
131        match self {
132            Error::Client(e) => write!(f, "client error: {e}"),
133            Error::Infallible(e) => match *e {},
134            Error::Other(e) => write!(f, "other error: {e}"),
135        }
136    }
137}
138
139impl From<std::io::Error> for Error {
140    fn from(value: std::io::Error) -> Self {
141        Self::Other(value.into())
142    }
143}
144
145impl From<http::Error> for Error {
146    fn from(value: http::Error) -> Self {
147        Self::Other(value.into())
148    }
149}
150
151impl std::error::Error for Error {}
152
153pub fn mock_response(
154    builder: http::response::Builder,
155    body: hyper::body::Bytes,
156) -> anyhow::Result<HttpResponse> {
157    Ok(builder.body(Body::from_bytes(body))?)
158}
159
160pub fn empty_response(builder: http::response::Builder) -> Result<HttpResponse, Error> {
161    Ok(builder.body(Body::empty())?)
162}
163
164#[pin_project(project=BodyProj)]
165#[derive(Debug)]
166pub enum Body {
167    Single(#[pin] http_body_util::Full<hyper::body::Bytes>),
168    Empty(#[pin] http_body_util::Empty<hyper::body::Bytes>),
169    Boxed(#[pin] http_body_util::combinators::BoxBody<hyper::body::Bytes, anyhow::Error>),
170    Channel(#[pin] tokio::sync::mpsc::Receiver<hyper::body::Bytes>),
171    Incoming(#[pin] hyper::body::Incoming),
172}
173
174pub struct Sender {
175    tx: tokio::sync::mpsc::Sender<hyper::body::Bytes>,
176}
177
178impl Sender {
179    pub async fn send_data(&self, data: hyper::body::Bytes) -> anyhow::Result<()> {
180        self.tx.send(data).await?;
181        Ok(())
182    }
183}
184
185impl Body {
186    pub fn empty() -> Self {
187        Body::Empty(http_body_util::Empty::new())
188    }
189
190    pub fn from_bytes(bytes: hyper::body::Bytes) -> Self {
191        Body::Single(http_body_util::Full::new(bytes))
192    }
193
194    pub fn boxed<
195        E: std::error::Error + Sync + Send + 'static,
196        T: hyper::body::Body<Data = hyper::body::Bytes, Error = E> + Sync + Send + 'static,
197    >(
198        body: T,
199    ) -> Self {
200        Body::Boxed(body.map_err(anyhow::Error::from).boxed())
201    }
202
203    pub fn channel() -> (Sender, Self) {
204        let (tx, rx) = tokio::sync::mpsc::channel(1);
205        (Sender { tx }, Body::Channel(rx))
206    }
207
208    pub fn incoming(incoming: Incoming) -> Self {
209        Body::Incoming(incoming)
210    }
211}
212
213impl Default for Body {
214    fn default() -> Self {
215        Body::empty()
216    }
217}
218
219impl From<&'static str> for Body {
220    fn from(s: &'static str) -> Self {
221        Body::from_bytes(hyper::body::Bytes::from_static(s.as_bytes()))
222    }
223}
224
225impl From<Vec<u8>> for Body {
226    fn from(s: Vec<u8>) -> Self {
227        Body::from_bytes(hyper::body::Bytes::from(s))
228    }
229}
230
231impl From<String> for Body {
232    fn from(s: String) -> Self {
233        Body::from_bytes(hyper::body::Bytes::from(s))
234    }
235}
236
237impl hyper::body::Body for Body {
238    type Data = hyper::body::Bytes;
239
240    type Error = Error;
241
242    fn poll_frame(
243        self: std::pin::Pin<&mut Self>,
244        cx: &mut std::task::Context<'_>,
245    ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
246        match self.project() {
247            BodyProj::Single(pin) => pin.poll_frame(cx).map_err(Error::Infallible),
248            BodyProj::Empty(pin) => pin.poll_frame(cx).map_err(Error::Infallible),
249            BodyProj::Boxed(pin) => pin.poll_frame(cx).map_err(Error::Other),
250            BodyProj::Channel(pin) => {
251                let data = match pin.get_mut().poll_recv(cx) {
252                    Poll::Ready(Some(data)) => data,
253                    Poll::Ready(None) => return Poll::Ready(None),
254                    Poll::Pending => return Poll::Pending,
255                };
256                Poll::Ready(Some(Ok(hyper::body::Frame::data(data))))
257            }
258            BodyProj::Incoming(pin) => pin
259                .poll_frame(cx)
260                .map_err(|e| Error::Client(ClientError::from(e))),
261        }
262    }
263
264    fn is_end_stream(&self) -> bool {
265        match self {
266            Body::Single(body) => body.is_end_stream(),
267            Body::Empty(body) => body.is_end_stream(),
268            Body::Boxed(body) => body.is_end_stream(),
269            Body::Channel(body) => body.is_closed() && body.is_empty(),
270            Body::Incoming(body) => body.is_end_stream(),
271        }
272    }
273
274    fn size_hint(&self) -> http_body::SizeHint {
275        match self {
276            Body::Single(body) => body.size_hint(),
277            Body::Empty(body) => body.size_hint(),
278            Body::Boxed(body) => body.size_hint(),
279            Body::Channel(_) => http_body::SizeHint::default(),
280            Body::Incoming(body) => body.size_hint(),
281        }
282    }
283}
284
285pub type GenericHttpClient<C> = hyper_util::client::legacy::Client<C, Body>;
286
287pub fn client_builder() -> hyper_util::client::legacy::Builder {
288    hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
289}