izihawa_ipfs_api_backend_hyper/
backend.rs

1// Copyright 2021 rust-ipfs-api Developers
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7//
8
9use crate::error::Error;
10use async_trait::async_trait;
11use bytes::Bytes;
12use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
13use http::{
14    header::{HeaderName, HeaderValue},
15    uri::Scheme,
16    StatusCode, Uri,
17};
18use hyper::{
19    body,
20    client::{self, connect::Connect, Builder, HttpConnector},
21};
22use ipfs_api_prelude::{ApiRequest, Backend, BoxStream, TryFromUri};
23use multipart::client::multipart;
24
25#[derive(Clone)]
26pub struct HyperBackend<C = HttpConnector>
27where
28    C: Connect + Clone + Send + Sync + 'static,
29{
30    base: Uri,
31    client: client::Client<C, hyper::Body>,
32}
33
34macro_rules! impl_default {
35    ($http_connector:path) => {
36        impl_default!($http_connector, <$http_connector>::new());
37    };
38    ($http_connector:path, $constructor:expr) => {
39        impl Default for HyperBackend<$http_connector> {
40            /// Creates an `IpfsClient` connected to the endpoint specified in ~/.ipfs/api.
41            /// If not found, tries to connect to `localhost:5001`.
42            ///
43            fn default() -> Self {
44                Self::from_ipfs_config().unwrap_or_else(|| {
45                    Self::from_host_and_port(Scheme::HTTP, "localhost", 5001).unwrap()
46                })
47            }
48        }
49
50        impl TryFromUri for HyperBackend<$http_connector> {
51            fn build_with_base_uri(base: Uri) -> Self {
52                let client = Builder::default()
53                    .pool_max_idle_per_host(0)
54                    .build($constructor);
55
56                HyperBackend { base, client }
57            }
58        }
59    };
60}
61
62impl_default!(HttpConnector);
63
64#[cfg(feature = "with-hyper-tls")]
65impl_default!(hyper_tls::HttpsConnector<HttpConnector>);
66
67#[cfg(feature = "with-hyper-rustls")]
68impl_default!(
69    hyper_rustls::HttpsConnector<HttpConnector>,
70    hyper_rustls::HttpsConnectorBuilder::new()
71        .with_native_roots()
72        .https_or_http()
73        .enable_http1()
74        .build()
75);
76
77#[cfg_attr(feature = "with-send-sync", async_trait)]
78#[cfg_attr(not(feature = "with-send-sync"), async_trait(?Send))]
79impl<C> Backend for HyperBackend<C>
80where
81    C: Connect + Clone + Send + Sync + 'static,
82{
83    type HttpRequest = http::Request<hyper::Body>;
84
85    type HttpResponse = http::Response<hyper::Body>;
86
87    type Error = Error;
88
89    fn build_base_request<Req>(
90        &self,
91        req: Req,
92        form: Option<multipart::Form<'static>>,
93    ) -> Result<Self::HttpRequest, Error>
94    where
95        Req: ApiRequest,
96    {
97        let url = req.absolute_url(&self.base)?;
98
99        let builder = http::Request::builder();
100        let builder = builder.method(Req::METHOD).uri(url);
101
102        let req = if let Some(form) = form {
103            form.set_body_convert::<hyper::Body, multipart::Body>(builder)
104        } else {
105            builder.body(hyper::Body::empty())
106        }?;
107
108        Ok(req)
109    }
110
111    fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue> {
112        res.headers().get(key)
113    }
114
115    async fn request_raw<Req>(
116        &self,
117        req: Req,
118        form: Option<multipart::Form<'static>>,
119    ) -> Result<(StatusCode, Bytes), Self::Error>
120    where
121        Req: ApiRequest,
122    {
123        let req = self.build_base_request(req, form)?;
124        let res = self.client.request(req).await?;
125        let status = res.status();
126        let body = body::to_bytes(res.into_body()).await?;
127
128        Ok((status, body))
129    }
130
131    fn response_to_byte_stream(res: Self::HttpResponse) -> BoxStream<Bytes, Self::Error> {
132        Box::new(res.into_body().err_into())
133    }
134
135    fn request_stream<Res, F>(
136        &self,
137        req: Self::HttpRequest,
138        process: F,
139    ) -> BoxStream<Res, Self::Error>
140    where
141        F: 'static + Send + Fn(Self::HttpResponse) -> BoxStream<Res, Self::Error>,
142    {
143        let stream = self
144            .client
145            .request(req)
146            .err_into()
147            .map_ok(move |res| {
148                match res.status() {
149                    StatusCode::OK => process(res).right_stream(),
150                    // If the server responded with an error status code, the body
151                    // still needs to be read so an error can be built. This block will
152                    // read the entire body stream, then immediately return an error.
153                    //
154                    _ => body::to_bytes(res.into_body())
155                        .boxed()
156                        .map(|maybe_body| match maybe_body {
157                            Ok(body) => Err(Self::process_error_from_body(body)),
158                            Err(e) => Err(e.into()),
159                        })
160                        .into_stream()
161                        .left_stream(),
162                }
163            })
164            .try_flatten_stream();
165
166        Box::new(stream)
167    }
168}