libdd_common/
hyper_migration.rs

1// Copyright 2021-Present Datadog, Inc. https://www.datadoghq.com/
2// SPDX-License-Identifier: Apache-2.0
3
4use core::fmt;
5use std::{convert::Infallible, task::Poll};
6
7use crate::connector::Connector;
8use http_body_util::BodyExt;
9use hyper::body::Incoming;
10use pin_project::pin_project;
11// Need aliases because cbindgen is not smart enough to figure type aliases
12use hyper::Request as HyperRequest;
13
14/// Create a new default configuration hyper client for fixed interval sending.
15///
16/// This client does not keep connections because otherwise we would get a pipe closed
17/// every second connection because of low keep alive in the agent.
18///
19/// This is on general not a problem if we use the client once every tens of seconds.
20pub fn new_client_periodic() -> GenericHttpClient<Connector> {
21    hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
22        .pool_max_idle_per_host(0)
23        .build(Connector::default())
24}
25
26/// Create a new default configuration hyper client.
27///
28/// It will keep connections open for a longer time and reuse them.
29pub fn new_default_client() -> GenericHttpClient<Connector> {
30    hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
31        .build(Connector::default())
32}
33
34pub type HttpResponse = hyper::Response<Body>;
35pub type HttpRequest = HyperRequest<Body>;
36pub type ClientError = hyper_util::client::legacy::Error;
37pub type ResponseFuture = hyper_util::client::legacy::ResponseFuture;
38
39pub fn into_response(response: hyper::Response<Incoming>) -> HttpResponse {
40    response.map(Body::Incoming)
41}
42
43#[derive(Debug)]
44pub enum Error {
45    Hyper(hyper::Error),
46    Legacy(hyper_util::client::legacy::Error),
47    Other(anyhow::Error),
48    Infallible(Infallible),
49}
50
51impl fmt::Display for Error {
52    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53        match self {
54            Error::Hyper(e) => write!(f, "hyper error: {e}"),
55            Error::Legacy(e) => write!(f, "hyper legacy error: {e}"),
56            Error::Infallible(e) => match *e {},
57            Error::Other(e) => write!(f, "other error: {e}"),
58        }
59    }
60}
61
62impl From<hyper_util::client::legacy::Error> for Error {
63    fn from(value: hyper_util::client::legacy::Error) -> Self {
64        Self::Legacy(value)
65    }
66}
67
68impl From<std::io::Error> for Error {
69    fn from(value: std::io::Error) -> Self {
70        Self::Other(value.into())
71    }
72}
73
74impl From<http::Error> for Error {
75    fn from(value: http::Error) -> Self {
76        Self::Other(value.into())
77    }
78}
79
80impl std::error::Error for Error {}
81
82pub fn mock_response(
83    builder: http::response::Builder,
84    body: hyper::body::Bytes,
85) -> anyhow::Result<HttpResponse> {
86    Ok(builder.body(Body::from_bytes(body))?)
87}
88
89pub fn empty_response(builder: http::response::Builder) -> Result<HttpResponse, Error> {
90    Ok(builder.body(Body::empty())?)
91}
92
93#[pin_project(project=BodyProj)]
94#[derive(Debug)]
95pub enum Body {
96    Single(#[pin] http_body_util::Full<hyper::body::Bytes>),
97    Empty(#[pin] http_body_util::Empty<hyper::body::Bytes>),
98    Boxed(#[pin] http_body_util::combinators::BoxBody<hyper::body::Bytes, anyhow::Error>),
99    Channel(#[pin] tokio::sync::mpsc::Receiver<hyper::body::Bytes>),
100    Incoming(#[pin] hyper::body::Incoming),
101}
102
103pub struct Sender {
104    tx: tokio::sync::mpsc::Sender<hyper::body::Bytes>,
105}
106
107impl Sender {
108    pub async fn send_data(&self, data: hyper::body::Bytes) -> anyhow::Result<()> {
109        self.tx.send(data).await?;
110        Ok(())
111    }
112}
113
114impl Body {
115    pub fn empty() -> Self {
116        Body::Empty(http_body_util::Empty::new())
117    }
118
119    pub fn from_bytes(bytes: hyper::body::Bytes) -> Self {
120        Body::Single(http_body_util::Full::new(bytes))
121    }
122
123    pub fn boxed<
124        E: std::error::Error + Sync + Send + 'static,
125        T: hyper::body::Body<Data = hyper::body::Bytes, Error = E> + Sync + Send + 'static,
126    >(
127        body: T,
128    ) -> Self {
129        Body::Boxed(body.map_err(anyhow::Error::from).boxed())
130    }
131
132    pub fn channel() -> (Sender, Self) {
133        let (tx, rx) = tokio::sync::mpsc::channel(1);
134        (Sender { tx }, Body::Channel(rx))
135    }
136
137    pub fn incoming(incoming: Incoming) -> Self {
138        Body::Incoming(incoming)
139    }
140}
141
142impl Default for Body {
143    fn default() -> Self {
144        Body::empty()
145    }
146}
147
148impl From<&'static str> for Body {
149    fn from(s: &'static str) -> Self {
150        Body::from_bytes(hyper::body::Bytes::from_static(s.as_bytes()))
151    }
152}
153
154impl From<Vec<u8>> for Body {
155    fn from(s: Vec<u8>) -> Self {
156        Body::from_bytes(hyper::body::Bytes::from(s))
157    }
158}
159
160impl From<String> for Body {
161    fn from(s: String) -> Self {
162        Body::from_bytes(hyper::body::Bytes::from(s))
163    }
164}
165
166impl hyper::body::Body for Body {
167    type Data = hyper::body::Bytes;
168
169    type Error = Error;
170
171    fn poll_frame(
172        self: std::pin::Pin<&mut Self>,
173        cx: &mut std::task::Context<'_>,
174    ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
175        match self.project() {
176            BodyProj::Single(pin) => pin.poll_frame(cx).map_err(Error::Infallible),
177            BodyProj::Empty(pin) => pin.poll_frame(cx).map_err(Error::Infallible),
178            BodyProj::Boxed(pin) => pin.poll_frame(cx).map_err(Error::Other),
179            BodyProj::Channel(pin) => {
180                let data = match pin.get_mut().poll_recv(cx) {
181                    Poll::Ready(Some(data)) => data,
182                    Poll::Ready(None) => return Poll::Ready(None),
183                    Poll::Pending => return Poll::Pending,
184                };
185                Poll::Ready(Some(Ok(hyper::body::Frame::data(data))))
186            }
187            BodyProj::Incoming(pin) => pin.poll_frame(cx).map_err(Error::Hyper),
188        }
189    }
190
191    fn is_end_stream(&self) -> bool {
192        match self {
193            Body::Single(body) => body.is_end_stream(),
194            Body::Empty(body) => body.is_end_stream(),
195            Body::Boxed(body) => body.is_end_stream(),
196            Body::Channel(body) => body.is_closed() && body.is_empty(),
197            Body::Incoming(body) => body.is_end_stream(),
198        }
199    }
200
201    fn size_hint(&self) -> http_body::SizeHint {
202        match self {
203            Body::Single(body) => body.size_hint(),
204            Body::Empty(body) => body.size_hint(),
205            Body::Boxed(body) => body.size_hint(),
206            Body::Channel(_) => http_body::SizeHint::default(),
207            Body::Incoming(body) => body.size_hint(),
208        }
209    }
210}
211
212pub type GenericHttpClient<C> = hyper_util::client::legacy::Client<C, Body>;
213
214pub fn client_builder() -> hyper_util::client::legacy::Builder {
215    hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
216}