ipfs_api_backend_hyper/
backend.rs

1// Copyright 2022 rust-ipfs-api Developers
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7//
8
9use crate::error::Error;
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
13use http::{
14    header::{HeaderName, HeaderValue},
15    uri::Scheme,
16    StatusCode, Uri,
17};
18use hyper::{
19    body,
20    client::{self, connect::Connect, Builder, HttpConnector},
21};
22use ipfs_api_prelude::{ApiRequest, Backend, BoxStream, TryFromUri};
23use multipart::client::multipart;
24
25macro_rules! impl_default {
26    ($http_connector:path) => {
27        impl_default!($http_connector, <$http_connector>::new());
28    };
29    ($http_connector:path, $constructor:expr) => {
30        #[derive(Clone)]
31        pub struct HyperBackend<C = $http_connector>
32        where
33            C: Connect + Clone + Send + Sync + 'static,
34        {
35            base: Uri,
36            client: client::Client<C, hyper::Body>,
37
38            /// Username and password
39            credentials: Option<(String, String)>,
40        }
41
42        impl Default for HyperBackend<$http_connector> {
43            /// Creates an `IpfsClient` connected to the endpoint specified in ~/.ipfs/api.
44            /// If not found, tries to connect to `localhost:5001`.
45            ///
46            fn default() -> Self {
47                Self::from_ipfs_config().unwrap_or_else(|| {
48                    Self::from_host_and_port(Scheme::HTTP, "localhost", 5001).unwrap()
49                })
50            }
51        }
52
53        impl TryFromUri for HyperBackend<$http_connector> {
54            fn build_with_base_uri(base: Uri) -> Self {
55                let client = Builder::default()
56                    .pool_max_idle_per_host(0)
57                    .build($constructor);
58
59                HyperBackend {
60                    base,
61                    client,
62                    credentials: None,
63                }
64            }
65        }
66    };
67}
68
69// Because the Hyper TLS connector supports both HTTP and HTTPS,
70// if TLS is enabled, always use the TLS connector as default.
71//
72// Otherwise, compile errors will result due to ambiguity:
73//
74//   * "cannot infer type for struct `IpfsClient<_>`"
75//
76#[cfg(not(feature = "with-hyper-tls"))]
77#[cfg(not(feature = "with-hyper-rustls"))]
78impl_default!(HttpConnector);
79
80#[cfg(feature = "with-hyper-tls")]
81impl_default!(hyper_tls::HttpsConnector<HttpConnector>);
82
83#[cfg(feature = "with-hyper-rustls")]
84impl_default!(
85    hyper_rustls::HttpsConnector<HttpConnector>,
86    hyper_rustls::HttpsConnectorBuilder::new()
87        .with_native_roots()
88        .https_or_http()
89        .enable_http1()
90        .build()
91);
92
93impl<C: Connect + Clone + Send + Sync + 'static> HyperBackend<C> {
94    pub fn with_credentials<U, P>(self, username: U, password: P) -> Self
95    where
96        U: Into<String>,
97        P: Into<String>,
98    {
99        Self {
100            base: self.base,
101            client: self.client,
102            credentials: Some((username.into(), password.into())),
103        }
104    }
105
106    fn basic_authorization(&self) -> Option<String> {
107        self.credentials.as_ref().map(|(username, password)| {
108            let credentials = format!("{}:{}", username, password);
109            let encoded = base64::encode(credentials);
110
111            format!("Basic {}", encoded)
112        })
113    }
114}
115
116#[cfg_attr(feature = "with-send-sync", async_trait)]
117#[cfg_attr(not(feature = "with-send-sync"), async_trait(?Send))]
118impl<C> Backend for HyperBackend<C>
119where
120    C: Connect + Clone + Send + Sync + 'static,
121{
122    type HttpRequest = http::Request<hyper::Body>;
123
124    type HttpResponse = http::Response<hyper::Body>;
125
126    type Error = Error;
127
128    fn with_credentials<U, P>(self, username: U, password: P) -> Self
129    where
130        U: Into<String>,
131        P: Into<String>,
132    {
133        (self as HyperBackend<C>).with_credentials(username, password)
134    }
135
136    fn build_base_request<Req>(
137        &self,
138        req: Req,
139        form: Option<multipart::Form<'static>>,
140    ) -> Result<Self::HttpRequest, Error>
141    where
142        Req: ApiRequest,
143    {
144        let url = req.absolute_url(&self.base)?;
145
146        let builder = http::Request::builder();
147        let builder = builder.method(Req::METHOD).uri(url);
148
149        let builder = if let Some(authorization) = self.basic_authorization() {
150            builder.header(hyper::header::AUTHORIZATION, authorization)
151        } else {
152            builder
153        };
154
155        let req = if let Some(form) = form {
156            form.set_body_convert::<hyper::Body, multipart::Body>(builder)
157        } else {
158            builder.body(hyper::Body::empty())
159        }?;
160
161        Ok(req)
162    }
163
164    fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue> {
165        res.headers().get(key)
166    }
167
168    async fn request_raw<Req>(
169        &self,
170        req: Req,
171        form: Option<multipart::Form<'static>>,
172    ) -> Result<(StatusCode, Bytes), Self::Error>
173    where
174        Req: ApiRequest,
175    {
176        let req = self.build_base_request(req, form)?;
177        let res = self.client.request(req).await?;
178        let status = res.status();
179        let body = body::to_bytes(res.into_body()).await?;
180
181        Ok((status, body))
182    }
183
184    fn response_to_byte_stream(res: Self::HttpResponse) -> BoxStream<Bytes, Self::Error> {
185        Box::new(res.into_body().err_into())
186    }
187
188    fn request_stream<Res, F>(
189        &self,
190        req: Self::HttpRequest,
191        process: F,
192    ) -> BoxStream<Res, Self::Error>
193    where
194        F: 'static + Send + Fn(Self::HttpResponse) -> BoxStream<Res, Self::Error>,
195    {
196        let stream = self
197            .client
198            .request(req)
199            .err_into()
200            .map_ok(move |res| {
201                match res.status() {
202                    StatusCode::OK => process(res).right_stream(),
203                    // If the server responded with an error status code, the body
204                    // still needs to be read so an error can be built. This block will
205                    // read the entire body stream, then immediately return an error.
206                    //
207                    _ => body::to_bytes(res.into_body())
208                        .boxed()
209                        .map(|maybe_body| match maybe_body {
210                            Ok(body) => Err(Self::process_error_from_body(body)),
211                            Err(e) => Err(e.into()),
212                        })
213                        .into_stream()
214                        .left_stream(),
215                }
216            })
217            .try_flatten_stream();
218
219        Box::new(stream)
220    }
221}