ipfs_api_prelude/
backend.rs

1// Copyright 2021 rust-ipfs-api Developers
2//
3// Licensed under the Apache License, Version 2.0, <LICENSE-APACHE or
4// http://apache.org/licenses/LICENSE-2.0> or the MIT license <LICENSE-MIT or
5// http://opensource.org/licenses/MIT>, at your option. This file may not be
6// copied, modified, or distributed except according to those terms.
7//
8
9use crate::{
10    header::{TRAILER, X_STREAM_ERROR_KEY},
11    read::{JsonLineDecoder, StreamReader},
12    ApiError, ApiRequest,
13};
14use async_trait::async_trait;
15use bytes::Bytes;
16use common_multipart_rfc7578::client::multipart;
17use futures::{future, FutureExt, Stream, TryStreamExt};
18use http::{
19    header::{HeaderName, HeaderValue},
20    StatusCode,
21};
22use serde::Deserialize;
23use std::fmt::Display;
24use tokio_util::codec::{Decoder, FramedRead};
25
26cfg_if::cfg_if! {
27    if #[cfg(feature = "with-send-sync")] {
28        pub type BoxStream<T, E> = Box<dyn Stream<Item = Result<T, E>> + Send + Unpin>;
29    } else {
30        pub type BoxStream<T, E> = Box<dyn Stream<Item = Result<T, E>> + Unpin>;
31    }
32}
33
34#[cfg_attr(feature = "with-send-sync", async_trait)]
35#[cfg_attr(not(feature = "with-send-sync"), async_trait(?Send))]
36pub trait Backend {
37    cfg_if::cfg_if! {
38        if #[cfg(feature = "with-send-sync")] {
39            type HttpRequest: Send;
40        } else {
41            type HttpRequest;
42        }
43    }
44
45    type HttpResponse;
46
47    cfg_if::cfg_if! {
48        if #[cfg(feature = "with-send-sync")] {
49            type Error: Display + From<ApiError> + From<crate::Error> + Send + 'static;
50        } else {
51            type Error: Display + From<ApiError> + From<crate::Error> + 'static;
52        }
53    }
54
55    /// Builds the url for an api call.
56    ///
57    fn build_base_request<Req>(
58        &self,
59        req: Req,
60        form: Option<multipart::Form<'static>>,
61    ) -> Result<Self::HttpRequest, Self::Error>
62    where
63        Req: ApiRequest;
64
65    /// Get the value of a header from an HTTP response.
66    ///
67    fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue>;
68
69    /// Generates a request, and returns the unprocessed response future.
70    ///
71    async fn request_raw<Req>(
72        &self,
73        req: Req,
74        form: Option<multipart::Form<'static>>,
75    ) -> Result<(StatusCode, Bytes), Self::Error>
76    where
77        Req: ApiRequest;
78
79    fn response_to_byte_stream(res: Self::HttpResponse) -> BoxStream<Bytes, Self::Error>;
80
81    /// Generic method for making a request that expects back a streaming
82    /// response.
83    ///
84    fn request_stream<Res, F>(
85        &self,
86        req: Self::HttpRequest,
87        process: F,
88    ) -> BoxStream<Res, Self::Error>
89    where
90        F: 'static + Send + Fn(Self::HttpResponse) -> BoxStream<Res, Self::Error>;
91
92    /// Builds an Api error from a response body.
93    ///
94    #[inline]
95    fn process_error_from_body(body: Bytes) -> Self::Error {
96        match serde_json::from_slice::<ApiError>(&body) {
97            Ok(e) => e.into(),
98            Err(_) => {
99                let err = match String::from_utf8(body.to_vec()) {
100                    Ok(s) => crate::Error::UnrecognizedApiError(s),
101                    Err(e) => crate::Error::from(e),
102                };
103
104                err.into()
105            }
106        }
107    }
108
109    /// Processes a response that expects a json encoded body, returning an
110    /// error or a deserialized json response.
111    ///
112    fn process_json_response<Res>(status: StatusCode, body: Bytes) -> Result<Res, Self::Error>
113    where
114        for<'de> Res: 'static + Deserialize<'de> + Send,
115    {
116        match status {
117            StatusCode::OK => serde_json::from_slice(&body)
118                .map_err(crate::Error::from)
119                .map_err(Self::Error::from),
120            _ => Err(Self::process_error_from_body(body)),
121        }
122    }
123
124    /// Processes a response that returns a stream of json deserializable
125    /// results.
126    ///
127    fn process_stream_response<D, Res>(
128        res: Self::HttpResponse,
129        decoder: D,
130    ) -> FramedRead<StreamReader<BoxStream<Bytes, Self::Error>>, D>
131    where
132        D: Decoder<Item = Res, Error = crate::Error>,
133    {
134        FramedRead::new(
135            StreamReader::new(Self::response_to_byte_stream(res)),
136            decoder,
137        )
138    }
139
140    /// Generic method for making a request to the Ipfs server, and getting
141    /// a deserializable response.
142    ///
143    async fn request<Req, Res>(
144        &self,
145        req: Req,
146        form: Option<multipart::Form<'static>>,
147    ) -> Result<Res, Self::Error>
148    where
149        Req: ApiRequest,
150        for<'de> Res: 'static + Deserialize<'de> + Send,
151    {
152        let (status, chunk) = self.request_raw(req, form).await?;
153
154        Self::process_json_response(status, chunk)
155    }
156
157    /// Generic method for making a request to the Ipfs server, and getting
158    /// back a response with no body.
159    ///
160    async fn request_empty<Req>(
161        &self,
162        req: Req,
163        form: Option<multipart::Form<'static>>,
164    ) -> Result<(), Self::Error>
165    where
166        Req: ApiRequest,
167    {
168        let (status, chunk) = self.request_raw(req, form).await?;
169
170        match status {
171            StatusCode::OK => Ok(()),
172            _ => Err(Self::process_error_from_body(chunk)),
173        }
174    }
175
176    /// Generic method for making a request to the Ipfs server, and getting
177    /// back a raw String response.
178    ///
179    async fn request_string<Req>(
180        &self,
181        req: Req,
182        form: Option<multipart::Form<'static>>,
183    ) -> Result<String, Self::Error>
184    where
185        Req: ApiRequest,
186    {
187        let (status, chunk) = self.request_raw(req, form).await?;
188
189        match status {
190            StatusCode::OK => String::from_utf8(chunk.to_vec())
191                .map_err(crate::Error::from)
192                .map_err(Self::Error::from),
193            _ => Err(Self::process_error_from_body(chunk)),
194        }
195    }
196
197    /// Generic method for making a request to the Ipfs server, and getting
198    /// back a raw stream of bytes.
199    ///
200    fn request_stream_bytes(&self, req: Self::HttpRequest) -> BoxStream<Bytes, Self::Error> {
201        self.request_stream(req, |res| Self::response_to_byte_stream(res))
202    }
203
204    /// Generic method to return a streaming response of deserialized json
205    /// objects delineated by new line separators.
206    ///
207    fn request_stream_json<Res>(&self, req: Self::HttpRequest) -> BoxStream<Res, Self::Error>
208    where
209        for<'de> Res: 'static + Deserialize<'de> + Send,
210    {
211        self.request_stream(req, |res| {
212            let parse_stream_error = if let Some(trailer) = Self::get_header(&res, TRAILER) {
213                // Response has the Trailer header set. The StreamError trailer
214                // is used to indicate that there was an error while streaming
215                // data with Ipfs.
216                //
217                if trailer == X_STREAM_ERROR_KEY {
218                    true
219                } else {
220                    let err = crate::Error::UnrecognizedTrailerHeader(
221                        String::from_utf8_lossy(trailer.as_ref()).into(),
222                    );
223
224                    // There was an unrecognized trailer value. If that is the case,
225                    // create a stream that immediately errors.
226                    //
227                    return Box::new(future::err(err).into_stream().err_into());
228                }
229            } else {
230                false
231            };
232
233            Box::new(
234                Self::process_stream_response(res, JsonLineDecoder::new(parse_stream_error))
235                    .err_into(),
236            )
237        })
238    }
239
240    /// Set basic authentication credentials to use on every request from this client.
241    fn with_credentials<U, P>(self, username: U, password: P) -> Self
242    where
243        U: Into<String>,
244        P: Into<String>;
245}