ipfs_api_backend_hyper/
backend.rs1use crate::error::Error;
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
13use http::{
14 header::{HeaderName, HeaderValue},
15 uri::Scheme,
16 StatusCode, Uri,
17};
18use hyper::{
19 body,
20 client::{self, connect::Connect, Builder, HttpConnector},
21};
22use ipfs_api_prelude::{ApiRequest, Backend, BoxStream, TryFromUri};
23use multipart::client::multipart;
24
25macro_rules! impl_default {
26 ($http_connector:path) => {
27 impl_default!($http_connector, <$http_connector>::new());
28 };
29 ($http_connector:path, $constructor:expr) => {
30 #[derive(Clone)]
31 pub struct HyperBackend<C = $http_connector>
32 where
33 C: Connect + Clone + Send + Sync + 'static,
34 {
35 base: Uri,
36 client: client::Client<C, hyper::Body>,
37
38 credentials: Option<(String, String)>,
40 }
41
42 impl Default for HyperBackend<$http_connector> {
43 fn default() -> Self {
47 Self::from_ipfs_config().unwrap_or_else(|| {
48 Self::from_host_and_port(Scheme::HTTP, "localhost", 5001).unwrap()
49 })
50 }
51 }
52
53 impl TryFromUri for HyperBackend<$http_connector> {
54 fn build_with_base_uri(base: Uri) -> Self {
55 let client = Builder::default()
56 .pool_max_idle_per_host(0)
57 .build($constructor);
58
59 HyperBackend {
60 base,
61 client,
62 credentials: None,
63 }
64 }
65 }
66 };
67}
68
69#[cfg(not(feature = "with-hyper-tls"))]
77#[cfg(not(feature = "with-hyper-rustls"))]
78impl_default!(HttpConnector);
79
80#[cfg(feature = "with-hyper-tls")]
81impl_default!(hyper_tls::HttpsConnector<HttpConnector>);
82
83#[cfg(feature = "with-hyper-rustls")]
84impl_default!(
85 hyper_rustls::HttpsConnector<HttpConnector>,
86 hyper_rustls::HttpsConnectorBuilder::new()
87 .with_native_roots()
88 .https_or_http()
89 .enable_http1()
90 .build()
91);
92
93impl<C: Connect + Clone + Send + Sync + 'static> HyperBackend<C> {
94 pub fn with_credentials<U, P>(self, username: U, password: P) -> Self
95 where
96 U: Into<String>,
97 P: Into<String>,
98 {
99 Self {
100 base: self.base,
101 client: self.client,
102 credentials: Some((username.into(), password.into())),
103 }
104 }
105
106 fn basic_authorization(&self) -> Option<String> {
107 self.credentials.as_ref().map(|(username, password)| {
108 let credentials = format!("{}:{}", username, password);
109 let encoded = base64::encode(credentials);
110
111 format!("Basic {}", encoded)
112 })
113 }
114}
115
116#[cfg_attr(feature = "with-send-sync", async_trait)]
117#[cfg_attr(not(feature = "with-send-sync"), async_trait(?Send))]
118impl<C> Backend for HyperBackend<C>
119where
120 C: Connect + Clone + Send + Sync + 'static,
121{
122 type HttpRequest = http::Request<hyper::Body>;
123
124 type HttpResponse = http::Response<hyper::Body>;
125
126 type Error = Error;
127
128 fn with_credentials<U, P>(self, username: U, password: P) -> Self
129 where
130 U: Into<String>,
131 P: Into<String>,
132 {
133 (self as HyperBackend<C>).with_credentials(username, password)
134 }
135
136 fn build_base_request<Req>(
137 &self,
138 req: Req,
139 form: Option<multipart::Form<'static>>,
140 ) -> Result<Self::HttpRequest, Error>
141 where
142 Req: ApiRequest,
143 {
144 let url = req.absolute_url(&self.base)?;
145
146 let builder = http::Request::builder();
147 let builder = builder.method(Req::METHOD).uri(url);
148
149 let builder = if let Some(authorization) = self.basic_authorization() {
150 builder.header(hyper::header::AUTHORIZATION, authorization)
151 } else {
152 builder
153 };
154
155 let req = if let Some(form) = form {
156 form.set_body_convert::<hyper::Body, multipart::Body>(builder)
157 } else {
158 builder.body(hyper::Body::empty())
159 }?;
160
161 Ok(req)
162 }
163
164 fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue> {
165 res.headers().get(key)
166 }
167
168 async fn request_raw<Req>(
169 &self,
170 req: Req,
171 form: Option<multipart::Form<'static>>,
172 ) -> Result<(StatusCode, Bytes), Self::Error>
173 where
174 Req: ApiRequest,
175 {
176 let req = self.build_base_request(req, form)?;
177 let res = self.client.request(req).await?;
178 let status = res.status();
179 let body = body::to_bytes(res.into_body()).await?;
180
181 Ok((status, body))
182 }
183
184 fn response_to_byte_stream(res: Self::HttpResponse) -> BoxStream<Bytes, Self::Error> {
185 Box::new(res.into_body().err_into())
186 }
187
188 fn request_stream<Res, F>(
189 &self,
190 req: Self::HttpRequest,
191 process: F,
192 ) -> BoxStream<Res, Self::Error>
193 where
194 F: 'static + Send + Fn(Self::HttpResponse) -> BoxStream<Res, Self::Error>,
195 {
196 let stream = self
197 .client
198 .request(req)
199 .err_into()
200 .map_ok(move |res| {
201 match res.status() {
202 StatusCode::OK => process(res).right_stream(),
203 _ => body::to_bytes(res.into_body())
208 .boxed()
209 .map(|maybe_body| match maybe_body {
210 Ok(body) => Err(Self::process_error_from_body(body)),
211 Err(e) => Err(e.into()),
212 })
213 .into_stream()
214 .left_stream(),
215 }
216 })
217 .try_flatten_stream();
218
219 Box::new(stream)
220 }
221}