libdd_common/
hyper_migration.rs1use core::fmt;
5use std::{convert::Infallible, task::Poll};
6
7use crate::connector::Connector;
8use http_body_util::BodyExt;
9use hyper::body::Incoming;
10use pin_project::pin_project;
11use hyper::Request as HyperRequest;
13
14pub fn new_client_periodic() -> GenericHttpClient<Connector> {
21 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
22 .pool_max_idle_per_host(0)
23 .build(Connector::default())
24}
25
26pub fn new_default_client() -> GenericHttpClient<Connector> {
30 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
31 .build(Connector::default())
32}
33
34pub type HttpResponse = hyper::Response<Body>;
35pub type HttpRequest = HyperRequest<Body>;
36pub type ClientError = hyper_util::client::legacy::Error;
37pub type ResponseFuture = hyper_util::client::legacy::ResponseFuture;
38
39pub fn into_response(response: hyper::Response<Incoming>) -> HttpResponse {
40 response.map(Body::Incoming)
41}
42
43#[derive(Debug)]
44pub enum Error {
45 Hyper(hyper::Error),
46 Legacy(hyper_util::client::legacy::Error),
47 Other(anyhow::Error),
48 Infallible(Infallible),
49}
50
51impl fmt::Display for Error {
52 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
53 match self {
54 Error::Hyper(e) => write!(f, "hyper error: {e}"),
55 Error::Legacy(e) => write!(f, "hyper legacy error: {e}"),
56 Error::Infallible(e) => match *e {},
57 Error::Other(e) => write!(f, "other error: {e}"),
58 }
59 }
60}
61
62impl From<hyper_util::client::legacy::Error> for Error {
63 fn from(value: hyper_util::client::legacy::Error) -> Self {
64 Self::Legacy(value)
65 }
66}
67
68impl From<std::io::Error> for Error {
69 fn from(value: std::io::Error) -> Self {
70 Self::Other(value.into())
71 }
72}
73
74impl From<http::Error> for Error {
75 fn from(value: http::Error) -> Self {
76 Self::Other(value.into())
77 }
78}
79
80impl std::error::Error for Error {}
81
82pub fn mock_response(
83 builder: http::response::Builder,
84 body: hyper::body::Bytes,
85) -> anyhow::Result<HttpResponse> {
86 Ok(builder.body(Body::from_bytes(body))?)
87}
88
89pub fn empty_response(builder: http::response::Builder) -> Result<HttpResponse, Error> {
90 Ok(builder.body(Body::empty())?)
91}
92
93#[pin_project(project=BodyProj)]
94#[derive(Debug)]
95pub enum Body {
96 Single(#[pin] http_body_util::Full<hyper::body::Bytes>),
97 Empty(#[pin] http_body_util::Empty<hyper::body::Bytes>),
98 Boxed(#[pin] http_body_util::combinators::BoxBody<hyper::body::Bytes, anyhow::Error>),
99 Channel(#[pin] tokio::sync::mpsc::Receiver<hyper::body::Bytes>),
100 Incoming(#[pin] hyper::body::Incoming),
101}
102
103pub struct Sender {
104 tx: tokio::sync::mpsc::Sender<hyper::body::Bytes>,
105}
106
107impl Sender {
108 pub async fn send_data(&self, data: hyper::body::Bytes) -> anyhow::Result<()> {
109 self.tx.send(data).await?;
110 Ok(())
111 }
112}
113
114impl Body {
115 pub fn empty() -> Self {
116 Body::Empty(http_body_util::Empty::new())
117 }
118
119 pub fn from_bytes(bytes: hyper::body::Bytes) -> Self {
120 Body::Single(http_body_util::Full::new(bytes))
121 }
122
123 pub fn boxed<
124 E: std::error::Error + Sync + Send + 'static,
125 T: hyper::body::Body<Data = hyper::body::Bytes, Error = E> + Sync + Send + 'static,
126 >(
127 body: T,
128 ) -> Self {
129 Body::Boxed(body.map_err(anyhow::Error::from).boxed())
130 }
131
132 pub fn channel() -> (Sender, Self) {
133 let (tx, rx) = tokio::sync::mpsc::channel(1);
134 (Sender { tx }, Body::Channel(rx))
135 }
136
137 pub fn incoming(incoming: Incoming) -> Self {
138 Body::Incoming(incoming)
139 }
140}
141
142impl Default for Body {
143 fn default() -> Self {
144 Body::empty()
145 }
146}
147
148impl From<&'static str> for Body {
149 fn from(s: &'static str) -> Self {
150 Body::from_bytes(hyper::body::Bytes::from_static(s.as_bytes()))
151 }
152}
153
154impl From<Vec<u8>> for Body {
155 fn from(s: Vec<u8>) -> Self {
156 Body::from_bytes(hyper::body::Bytes::from(s))
157 }
158}
159
160impl From<String> for Body {
161 fn from(s: String) -> Self {
162 Body::from_bytes(hyper::body::Bytes::from(s))
163 }
164}
165
166impl hyper::body::Body for Body {
167 type Data = hyper::body::Bytes;
168
169 type Error = Error;
170
171 fn poll_frame(
172 self: std::pin::Pin<&mut Self>,
173 cx: &mut std::task::Context<'_>,
174 ) -> std::task::Poll<Option<Result<http_body::Frame<Self::Data>, Self::Error>>> {
175 match self.project() {
176 BodyProj::Single(pin) => pin.poll_frame(cx).map_err(Error::Infallible),
177 BodyProj::Empty(pin) => pin.poll_frame(cx).map_err(Error::Infallible),
178 BodyProj::Boxed(pin) => pin.poll_frame(cx).map_err(Error::Other),
179 BodyProj::Channel(pin) => {
180 let data = match pin.get_mut().poll_recv(cx) {
181 Poll::Ready(Some(data)) => data,
182 Poll::Ready(None) => return Poll::Ready(None),
183 Poll::Pending => return Poll::Pending,
184 };
185 Poll::Ready(Some(Ok(hyper::body::Frame::data(data))))
186 }
187 BodyProj::Incoming(pin) => pin.poll_frame(cx).map_err(Error::Hyper),
188 }
189 }
190
191 fn is_end_stream(&self) -> bool {
192 match self {
193 Body::Single(body) => body.is_end_stream(),
194 Body::Empty(body) => body.is_end_stream(),
195 Body::Boxed(body) => body.is_end_stream(),
196 Body::Channel(body) => body.is_closed() && body.is_empty(),
197 Body::Incoming(body) => body.is_end_stream(),
198 }
199 }
200
201 fn size_hint(&self) -> http_body::SizeHint {
202 match self {
203 Body::Single(body) => body.size_hint(),
204 Body::Empty(body) => body.size_hint(),
205 Body::Boxed(body) => body.size_hint(),
206 Body::Channel(_) => http_body::SizeHint::default(),
207 Body::Incoming(body) => body.size_hint(),
208 }
209 }
210}
211
212pub type GenericHttpClient<C> = hyper_util::client::legacy::Client<C, Body>;
213
214pub fn client_builder() -> hyper_util::client::legacy::Builder {
215 hyper_util::client::legacy::Client::builder(hyper_util::rt::TokioExecutor::default())
216}