ipfs_api_backend_actix/
backend.rs

1// Copyright 2021 rust-ipfs-api Developers
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7//
8
9use crate::error::Error;
10use async_trait::async_trait;
11use awc::Client;
12use bytes::Bytes;
13use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
14use http::{
15    header::{HeaderName, HeaderValue},
16    uri::Scheme,
17    StatusCode, Uri,
18};
19use ipfs_api_prelude::{ApiRequest, Backend, BoxStream, TryFromUri};
20use multipart::client::multipart;
21use std::time::Duration;
22
23const ACTIX_REQUEST_TIMEOUT: Duration = Duration::from_secs(90);
24
25pub struct ActixBackend {
26    base: Uri,
27    client: Client,
28
29    /// Username and password
30    credentials: Option<(String, String)>,
31}
32
33impl Default for ActixBackend {
34    /// Creates an `IpfsClient` connected to the endpoint specified in ~/.ipfs/api.
35    /// If not found, tries to connect to `localhost:5001`.
36    ///
37    fn default() -> Self {
38        Self::from_ipfs_config()
39            .unwrap_or_else(|| Self::from_host_and_port(Scheme::HTTP, "localhost", 5001).unwrap())
40    }
41}
42
43impl TryFromUri for ActixBackend {
44    fn build_with_base_uri(base: Uri) -> Self {
45        let client = Client::default();
46
47        ActixBackend {
48            base,
49            client,
50            credentials: None,
51        }
52    }
53}
54
55impl ActixBackend {
56    pub fn with_credentials<U, P>(self, username: U, password: P) -> Self
57    where
58        U: Into<String>,
59        P: Into<String>,
60    {
61        Self {
62            base: self.base,
63            client: self.client,
64            credentials: Some((username.into(), password.into())),
65        }
66    }
67}
68
69#[async_trait(?Send)]
70impl Backend for ActixBackend {
71    type HttpRequest = awc::SendClientRequest;
72
73    type HttpResponse = awc::ClientResponse<actix_http::encoding::Decoder<actix_http::Payload>>;
74
75    type Error = Error;
76
77    fn with_credentials<U, P>(self, username: U, password: P) -> Self
78    where
79        U: Into<String>,
80        P: Into<String>,
81    {
82        (self as ActixBackend).with_credentials(username, password)
83    }
84
85    fn build_base_request<Req>(
86        &self,
87        req: Req,
88        form: Option<multipart::Form<'static>>,
89    ) -> Result<Self::HttpRequest, Error>
90    where
91        Req: ApiRequest,
92    {
93        let url = req.absolute_url(&self.base)?;
94        let req = self.client.request(Req::METHOD, url);
95        let req = if let Some((username, password)) = &self.credentials {
96            req.basic_auth(username, password)
97        } else {
98            req
99        };
100        let req = if let Some(form) = form {
101            req.content_type(form.content_type())
102                .send_body(multipart::Body::from(form))
103        } else {
104            req.timeout(ACTIX_REQUEST_TIMEOUT).send()
105        };
106
107        Ok(req)
108    }
109
110    fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue> {
111        res.headers().get(key)
112    }
113
114    async fn request_raw<Req>(
115        &self,
116        req: Req,
117        form: Option<multipart::Form<'static>>,
118    ) -> Result<(StatusCode, Bytes), Self::Error>
119    where
120        Req: ApiRequest,
121    {
122        let req = self.build_base_request(req, form)?;
123        let mut res = req.await?;
124        let status = res.status();
125        let body = res.body().await?;
126
127        // FIXME: Actix compat with bytes 1.0
128        Ok((status, body))
129    }
130
131    fn response_to_byte_stream(res: Self::HttpResponse) -> BoxStream<Bytes, Self::Error> {
132        let stream = res.err_into();
133
134        Box::new(stream)
135    }
136
137    fn request_stream<Res, F>(
138        &self,
139        req: Self::HttpRequest,
140        process: F,
141    ) -> BoxStream<Res, Self::Error>
142    where
143        F: 'static + Send + Fn(Self::HttpResponse) -> BoxStream<Res, Self::Error>,
144    {
145        let stream = req
146            .err_into()
147            .map_ok(move |mut res| {
148                match res.status() {
149                    StatusCode::OK => process(res).right_stream(),
150                    // If the server responded with an error status code, the body
151                    // still needs to be read so an error can be built. This block will
152                    // read the entire body stream, then immediately return an error.
153                    //
154                    _ => res
155                        .body()
156                        .map(|maybe_body| match maybe_body {
157                            Ok(body) => Err(Self::process_error_from_body(body)),
158                            Err(e) => Err(e.into()),
159                        })
160                        .into_stream()
161                        .left_stream(),
162                }
163            })
164            .try_flatten_stream();
165
166        Box::new(stream)
167    }
168}