ipfs_api_backend_actix/
backend.rs1use crate::error::Error;
10use async_trait::async_trait;
11use awc::Client;
12use bytes::Bytes;
13use futures::{FutureExt, StreamExt, TryFutureExt, TryStreamExt};
14use http::{
15 header::{HeaderName, HeaderValue},
16 uri::Scheme,
17 StatusCode, Uri,
18};
19use ipfs_api_prelude::{ApiRequest, Backend, BoxStream, TryFromUri};
20use multipart::client::multipart;
21use std::time::Duration;
22
23const ACTIX_REQUEST_TIMEOUT: Duration = Duration::from_secs(90);
24
25pub struct ActixBackend {
26 base: Uri,
27 client: Client,
28
29 credentials: Option<(String, String)>,
31}
32
33impl Default for ActixBackend {
34 fn default() -> Self {
38 Self::from_ipfs_config()
39 .unwrap_or_else(|| Self::from_host_and_port(Scheme::HTTP, "localhost", 5001).unwrap())
40 }
41}
42
43impl TryFromUri for ActixBackend {
44 fn build_with_base_uri(base: Uri) -> Self {
45 let client = Client::default();
46
47 ActixBackend {
48 base,
49 client,
50 credentials: None,
51 }
52 }
53}
54
55impl ActixBackend {
56 pub fn with_credentials<U, P>(self, username: U, password: P) -> Self
57 where
58 U: Into<String>,
59 P: Into<String>,
60 {
61 Self {
62 base: self.base,
63 client: self.client,
64 credentials: Some((username.into(), password.into())),
65 }
66 }
67}
68
69#[async_trait(?Send)]
70impl Backend for ActixBackend {
71 type HttpRequest = awc::SendClientRequest;
72
73 type HttpResponse = awc::ClientResponse<actix_http::encoding::Decoder<actix_http::Payload>>;
74
75 type Error = Error;
76
77 fn with_credentials<U, P>(self, username: U, password: P) -> Self
78 where
79 U: Into<String>,
80 P: Into<String>,
81 {
82 (self as ActixBackend).with_credentials(username, password)
83 }
84
85 fn build_base_request<Req>(
86 &self,
87 req: Req,
88 form: Option<multipart::Form<'static>>,
89 ) -> Result<Self::HttpRequest, Error>
90 where
91 Req: ApiRequest,
92 {
93 let url = req.absolute_url(&self.base)?;
94 let req = self.client.request(Req::METHOD, url);
95 let req = if let Some((username, password)) = &self.credentials {
96 req.basic_auth(username, password)
97 } else {
98 req
99 };
100 let req = if let Some(form) = form {
101 req.content_type(form.content_type())
102 .send_body(multipart::Body::from(form))
103 } else {
104 req.timeout(ACTIX_REQUEST_TIMEOUT).send()
105 };
106
107 Ok(req)
108 }
109
110 fn get_header(res: &Self::HttpResponse, key: HeaderName) -> Option<&HeaderValue> {
111 res.headers().get(key)
112 }
113
114 async fn request_raw<Req>(
115 &self,
116 req: Req,
117 form: Option<multipart::Form<'static>>,
118 ) -> Result<(StatusCode, Bytes), Self::Error>
119 where
120 Req: ApiRequest,
121 {
122 let req = self.build_base_request(req, form)?;
123 let mut res = req.await?;
124 let status = res.status();
125 let body = res.body().await?;
126
127 Ok((status, body))
129 }
130
131 fn response_to_byte_stream(res: Self::HttpResponse) -> BoxStream<Bytes, Self::Error> {
132 let stream = res.err_into();
133
134 Box::new(stream)
135 }
136
137 fn request_stream<Res, F>(
138 &self,
139 req: Self::HttpRequest,
140 process: F,
141 ) -> BoxStream<Res, Self::Error>
142 where
143 F: 'static + Send + Fn(Self::HttpResponse) -> BoxStream<Res, Self::Error>,
144 {
145 let stream = req
146 .err_into()
147 .map_ok(move |mut res| {
148 match res.status() {
149 StatusCode::OK => process(res).right_stream(),
150 _ => res
155 .body()
156 .map(|maybe_body| match maybe_body {
157 Ok(body) => Err(Self::process_error_from_body(body)),
158 Err(e) => Err(e.into()),
159 })
160 .into_stream()
161 .left_stream(),
162 }
163 })
164 .try_flatten_stream();
165
166 Box::new(stream)
167 }
168}