izihawa_ipfs_api_backend_hyper/
backend.rs1use crate::error::Error;
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
13use http::{
14 header::{HeaderName, HeaderValue},
15 uri::Scheme,
16 StatusCode, Uri,
17};
18use hyper::{
19 body,
20 client::{self, connect::Connect, Builder, HttpConnector},
21};
22use ipfs_api_prelude::{ApiRequest, Backend, BoxStream, TryFromUri};
23use multipart::client::multipart;
24
25#[derive(Clone)]
26pub struct HyperBackend<C = HttpConnector>
27where
28 C: Connect + Clone + Send + Sync + 'static,
29{
30 base: Uri,
31 client: client::Client<C, hyper::Body>,
32}
33
34macro_rules! impl_default {
35 ($http_connector:path) => {
36 impl_default!($http_connector, <$http_connector>::new());
37 };
38 ($http_connector:path, $constructor:expr) => {
39 impl Default for HyperBackend<$http_connector> {
40 fn default() -> Self {
44 Self::from_ipfs_config().unwrap_or_else(|| {
45 Self::from_host_and_port(Scheme::HTTP, "localhost", 5001).unwrap()
46 })
47 }
48 }
49
50 impl TryFromUri for HyperBackend<$http_connector> {
51 fn build_with_base_uri(base: Uri) -> Self {
52 let client = Builder::default()
53 .pool_max_idle_per_host(0)
54 .build($constructor);
55
56 HyperBackend { base, client }
57 }
58 }
59 };
60}
61
62impl_default!(HttpConnector);
63
64#[cfg(feature = "with-hyper-tls")]
65impl_default!(hyper_tls::HttpsConnector<HttpConnector>);
66
67#[cfg(feature = "with-hyper-rustls")]
68impl_default!(
69 hyper_rustls::HttpsConnector<HttpConnector>,
70 hyper_rustls::HttpsConnectorBuilder::new()
71 .with_native_roots()
72 .https_or_http()
73 .enable_http1()
74 .build()
75);
76
77#[cfg_attr(feature = "with-send-sync", async_trait)]
78#[cfg_attr(not(feature = "with-send-sync"), async_trait(?Send))]
79impl<C> Backend for HyperBackend<C>
80where
81 C: Connect + Clone + Send + Sync + 'static,
82{
83 type HttpRequest = http::Request<hyper::Body>;
84
85 type HttpResponse = http::Response<hyper::Body>;
86
87 type Error = Error;
88
89 fn build_base_request<Req>(
90 &self,
91 req: Req,
92 form: Option<multipart::Form<'static>>,
93 ) -> Result<Self::HttpRequest, Error>
94 where
95 Req: ApiRequest,
96 {
97 let url = req.absolute_url(&self.base)?;
98
99 let builder = http::Request::builder();
100 let builder = builder.method(Req::METHOD).uri(url);
101
102 let req = if let Some(form) = form {
103 form.set_body_convert::<hyper::Body, multipart::Body>(builder)
104 } else {
105 builder.body(hyper::Body::empty())
106 }?;
107
108 Ok(req)
109 }
110
111 fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue> {
112 res.headers().get(key)
113 }
114
115 async fn request_raw<Req>(
116 &self,
117 req: Req,
118 form: Option<multipart::Form<'static>>,
119 ) -> Result<(StatusCode, Bytes), Self::Error>
120 where
121 Req: ApiRequest,
122 {
123 let req = self.build_base_request(req, form)?;
124 let res = self.client.request(req).await?;
125 let status = res.status();
126 let body = body::to_bytes(res.into_body()).await?;
127
128 Ok((status, body))
129 }
130
131 fn response_to_byte_stream(res: Self::HttpResponse) -> BoxStream<Bytes, Self::Error> {
132 Box::new(res.into_body().err_into())
133 }
134
135 fn request_stream<Res, F>(
136 &self,
137 req: Self::HttpRequest,
138 process: F,
139 ) -> BoxStream<Res, Self::Error>
140 where
141 F: 'static + Send + Fn(Self::HttpResponse) -> BoxStream<Res, Self::Error>,
142 {
143 let stream = self
144 .client
145 .request(req)
146 .err_into()
147 .map_ok(move |res| {
148 match res.status() {
149 StatusCode::OK => process(res).right_stream(),
150 _ => body::to_bytes(res.into_body())
155 .boxed()
156 .map(|maybe_body| match maybe_body {
157 Ok(body) => Err(Self::process_error_from_body(body)),
158 Err(e) => Err(e.into()),
159 })
160 .into_stream()
161 .left_stream(),
162 }
163 })
164 .try_flatten_stream();
165
166 Box::new(stream)
167 }
168}