1use anyhow::Result;
2use async_trait::async_trait;
3use bytes::Bytes;
4use derive_more::{Display, Error};
5use futures::{
6 future,
7 stream::{iter, BoxStream, Stream, StreamExt},
8};
9use libipld::Cid;
10use reqwest::{
11 header::{CONTENT_DISPOSITION, CONTENT_TYPE},
12 multipart::Form,
13 Client, RequestBuilder, Response, StatusCode,
14};
15use serde::{Deserialize, Serialize};
16use std::{
17 fmt::Debug,
18 str::FromStr,
19 sync::{Arc, RwLock},
20 time::Duration,
21};
22use url::Url;
23
24use crate::{
25 service::{
26 AuthenticationResponse, EventService, FilesGetResponse, OffsetsResponse, PublishRequest, PublishResponse,
27 QueryRequest, QueryResponse, SubscribeMonotonicRequest, SubscribeMonotonicResponse, SubscribeRequest,
28 SubscribeResponse,
29 },
30 AppManifest, NodeId,
31};
32use rand::Rng;
33
34#[derive(Clone, Debug, Error, Display, Serialize, Deserialize, PartialEq)]
40#[display(fmt = "error {} while {}: {}", error_code, context, error)]
41#[serde(rename_all = "camelCase")]
42pub struct HttpClientError {
43 pub error: serde_json::Value,
44 pub error_code: u16,
45 pub context: String,
46}
47
48#[derive(Clone)]
49pub struct HttpClient {
50 client: Client,
51 base_url: Url,
52 token: Arc<RwLock<String>>,
53 app_manifest: AppManifest,
54 node_id: NodeId,
55}
56
57async fn get_token(client: &Client, base_url: &Url, app_manifest: &AppManifest) -> anyhow::Result<String> {
58 let body = serde_json::to_value(app_manifest).context(|| format!("serializing {:?}", app_manifest))?;
59 let response = client.post(base_url.join("auth")?).json(&body).send().await?;
60 let bytes = response
61 .bytes()
62 .await
63 .context(|| "getting body for authentication response")?;
64 let token: AuthenticationResponse =
65 serde_json::from_slice(bytes.as_ref()).context(|| "deserializing authentication response")?;
66 Ok(token.token)
67}
68
69impl HttpClient {
70 pub async fn new(origin: Url, app_manifest: AppManifest) -> anyhow::Result<Self> {
73 anyhow::ensure!(!origin.cannot_be_a_base(), "{} is not a valid base address", origin);
74 let mut base_url = origin;
75 base_url.set_path("api/v2/");
76 let client = Client::new();
77
78 let node_id = client
79 .get(base_url.join("node/id").unwrap())
80 .send()
81 .await?
82 .text()
83 .await
84 .context(|| "getting body for GET node/id")?
85 .parse()?;
86
87 let token = get_token(&client, &base_url, &app_manifest).await?;
88
89 Ok(Self {
90 client,
91 base_url,
92 token: Arc::new(RwLock::new(token)),
93 app_manifest,
94 node_id,
95 })
96 }
97
98 pub fn node_id(&self) -> NodeId {
99 self.node_id
100 }
101
102 fn events_url(&self, path: &str) -> Url {
103 self.base_url.join(&format!("events/{}", path)).unwrap()
105 }
106
107 fn files_url(&self) -> Url {
108 self.base_url.join("files/").unwrap()
109 }
110
111 async fn re_authenticate(&self) -> anyhow::Result<String> {
112 let token = get_token(&self.client, &self.base_url, &self.app_manifest).await?;
113 let mut write_guard = self.token.write().unwrap();
114 *write_guard = token.clone();
115 Ok(token)
116 }
117
118 async fn do_request(&self, f: impl FnOnce(&Client) -> RequestBuilder) -> anyhow::Result<Response> {
121 let token = self.token.read().unwrap().clone();
122 let builder = f(&self.client);
123 let builder_clone = builder.try_clone();
124
125 let req = builder.header("Authorization", &format!("Bearer {}", token)).build()?;
126 let url = req.url().clone();
127 let method = req.method().clone();
128 let mut response = self
129 .client
130 .execute(req)
131 .await
132 .context(|| format!("sending {} {}", method, url))?;
133
134 if let Some(builder) = builder_clone.as_ref() {
135 if response.status() == StatusCode::UNAUTHORIZED {
137 let token = self.re_authenticate().await?;
138 response = builder
139 .try_clone()
140 .expect("Already cloned it once")
141 .header("Authorization", &format!("Bearer {}", token))
142 .send()
143 .await
144 .context(|| format!("sending {} {}", method, url))?;
145 }
146
147 let mut retries = 10;
148 let mut delay = Duration::from_secs(0);
149 loop {
150 if response.status() == StatusCode::SERVICE_UNAVAILABLE && retries > 0 {
151 retries -= 1;
152 delay = delay * 2 + Duration::from_millis(rand::thread_rng().gen_range(10..200));
153 tracing::debug!(
154 "Actyx Node is overloaded, retrying {} {} with a delay of {:?}",
155 method,
156 url,
157 delay
158 );
159 #[cfg(feature = "with-tokio")]
160 tokio::time::sleep(delay).await;
161 #[cfg(not(feature = "with-tokio"))]
162 std::thread::sleep(delay);
163 response = builder
164 .try_clone()
165 .expect("Already cloned it once")
166 .header("Authorization", &format!("Bearer {}", token))
167 .send()
168 .await
169 .context(|| format!("sending {} {}", method, url))?;
170 } else {
171 break;
172 }
173 }
174 } else {
175 tracing::warn!("Request can't be retried, as its body is based on a stream");
176 if response.status() == StatusCode::UNAUTHORIZED {
178 tracing::info!("Can't retry request, but re-authenticated anyway. SDK user must retry request.");
179 self.re_authenticate().await?;
180 }
181 }
182
183 if response.status().is_success() {
184 Ok(response)
185 } else {
186 let error_code = response.status().as_u16();
187 Err(HttpClientError {
188 error: response
189 .json()
190 .await
191 .context(|| format!("getting body for {} reply to {:?}", error_code, builder_clone))?,
192 error_code,
193 context: format!("sending {:?}", builder_clone),
194 }
195 .into())
196 }
197 }
198
199 pub async fn files_post(&self, files: impl IntoIterator<Item = reqwest::multipart::Part>) -> anyhow::Result<Cid> {
200 let mut form = Form::new();
201 for file in files {
202 form = form.part("file", file);
203 }
204 let response = self
205 .do_request(move |c| c.post(self.files_url()).multipart(form))
206 .await?;
207 let hash = response
208 .text_with_charset("utf-8")
209 .await
210 .context(|| "Parsing response".to_string())?;
211 let cid = Cid::from_str(&*hash).map_err(|e| HttpClientError {
212 error: serde_json::Value::String(e.to_string()),
213 error_code: 102,
214 context: format!("Tried to parse {} into a Cid", hash),
215 })?;
216 Ok(cid)
217 }
218
219 pub async fn files_get(&self, cid_or_name: &str) -> anyhow::Result<FilesGetResponse> {
220 let url = self.files_url().join(cid_or_name)?;
221 let response = self.do_request(move |c| c.get(url)).await?;
222
223 let maybe_name = response.headers().get(CONTENT_DISPOSITION).cloned();
224 let maybe_mime = response.headers().get(CONTENT_TYPE).cloned();
225 let bytes = response.bytes().await?;
226 if let Ok(dir @ FilesGetResponse::Directory { .. }) = serde_json::from_slice(bytes.as_ref()) {
227 Ok(dir)
228 } else {
229 let mime = maybe_mime
230 .and_then(|h| h.to_str().ok().map(|x| x.to_string()))
231 .unwrap_or_else(|| "application/octet-stream".to_string());
232 let name = maybe_name
233 .and_then(|n| {
234 n.to_str().ok().and_then(|p| {
235 p.split(';')
236 .find(|x| x.starts_with("filename="))
237 .map(|f| f.trim_start_matches("filename=").to_string())
238 })
239 })
240 .unwrap_or_else(|| "".to_string());
241 Ok(FilesGetResponse::File {
242 name,
243 bytes: bytes.to_vec(),
244 mime,
245 })
246 }
247 }
248}
249
250impl Debug for HttpClient {
251 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252 f.debug_struct("HttpClient")
253 .field("base_url", &self.base_url.as_str())
254 .field("app_manifest", &self.app_manifest)
255 .finish()
256 }
257}
258
259#[async_trait]
260impl EventService for HttpClient {
261 async fn offsets(&self) -> anyhow::Result<OffsetsResponse> {
262 let response = self.do_request(|c| c.get(self.events_url("offsets"))).await?;
263 let bytes = response
264 .bytes()
265 .await
266 .context(|| format!("getting body for GET {}", self.events_url("offsets")))?;
267 Ok(serde_json::from_slice(bytes.as_ref()).context(|| {
268 format!(
269 "deserializing offsets response from {:?} received from GET {}",
270 bytes,
271 self.events_url("offsets")
272 )
273 })?)
274 }
275
276 async fn publish(&self, request: PublishRequest) -> anyhow::Result<PublishResponse> {
277 let body = serde_json::to_value(&request).context(|| format!("serializing {:?}", &request))?;
278 let response = self
279 .do_request(|c| c.post(self.events_url("publish")).json(&body))
280 .await?;
281 let bytes = response
282 .bytes()
283 .await
284 .context(|| format!("getting body for GET {}", self.events_url("publish")))?;
285 Ok(serde_json::from_slice(bytes.as_ref()).context(|| {
286 format!(
287 "deserializing publish response from {:?} received from GET {}",
288 bytes,
289 self.events_url("publish")
290 )
291 })?)
292 }
293
294 async fn query(&self, request: QueryRequest) -> anyhow::Result<BoxStream<'static, QueryResponse>> {
295 let body = serde_json::to_value(&request).context(|| format!("serializing {:?}", &request))?;
296 let response = self
297 .do_request(|c| c.post(self.events_url("query")).json(&body))
298 .await?;
299 let res = to_lines(response.bytes_stream())
300 .map(|bs| serde_json::from_slice(bs.as_ref()))
301 .filter_map(|res| future::ready(res.ok()));
303 Ok(res.boxed())
304 }
305
306 async fn subscribe(&self, request: SubscribeRequest) -> anyhow::Result<BoxStream<'static, SubscribeResponse>> {
307 let body = serde_json::to_value(&request).context(|| format!("serializing {:?}", &request))?;
308 let response = self
309 .do_request(|c| c.post(self.events_url("subscribe")).json(&body))
310 .await?;
311 let res = to_lines(response.bytes_stream())
312 .map(|bs| serde_json::from_slice(bs.as_ref()))
313 .filter_map(|res| future::ready(res.ok()));
315 Ok(res.boxed())
316 }
317
318 async fn subscribe_monotonic(
319 &self,
320 request: SubscribeMonotonicRequest,
321 ) -> anyhow::Result<BoxStream<'static, SubscribeMonotonicResponse>> {
322 let body = serde_json::to_value(&request).context(|| format!("serializing {:?}", &request))?;
323 let response = self
324 .do_request(|c| c.post(self.events_url("subscribe_monotonic")).json(&body))
325 .await?;
326 let res = to_lines(response.bytes_stream())
327 .map(|bs| serde_json::from_slice(bs.as_ref()))
328 .filter_map(|res| future::ready(res.ok()));
330 Ok(res.boxed())
331 }
332}
333
334pub(crate) fn to_lines(stream: impl Stream<Item = Result<Bytes, reqwest::Error>>) -> impl Stream<Item = Vec<u8>> {
335 let mut buf = Vec::<u8>::new();
336 let to_lines = move |bytes: Bytes| {
337 buf.extend_from_slice(bytes.as_ref());
338 let mut ret = buf.split(|b| *b == b'\n').map(|bs| bs.to_vec()).collect::<Vec<_>>();
339 if let Some(last) = ret.pop() {
340 buf.clear();
341 buf.extend_from_slice(last.as_ref());
342 }
343 iter(ret.into_iter().map(|mut bs| {
344 if bs.ends_with(b"\r") {
345 bs.pop();
346 }
347 bs
348 }))
349 };
350 stream
351 .take_while(|res| future::ready(res.is_ok()))
352 .map(|res| res.unwrap())
353 .map(to_lines)
354 .flatten()
355}
356
357pub(crate) trait WithContext {
358 type Output;
359 fn context<F, T>(self, context: F) -> Self::Output
360 where
361 T: Into<String>,
362 F: FnOnce() -> T;
363}
364impl<T, E> WithContext for std::result::Result<T, E>
365where
366 HttpClientError: From<(String, E)>,
367{
368 type Output = std::result::Result<T, HttpClientError>;
369
370 #[inline]
371 fn context<F, C>(self, context: F) -> Self::Output
372 where
373 C: Into<String>,
374 F: FnOnce() -> C,
375 {
376 match self {
377 Ok(value) => Ok(value),
378 Err(err) => Err(HttpClientError::from((context().into(), err))),
379 }
380 }
381}
382
383impl From<(String, reqwest::Error)> for HttpClientError {
384 fn from(e: (String, reqwest::Error)) -> Self {
385 Self {
386 error: serde_json::json!(format!("{:?}", e.1)),
387 error_code: 101,
388 context: e.0,
389 }
390 }
391}
392
393impl From<(String, serde_json::Error)> for HttpClientError {
394 fn from(e: (String, serde_json::Error)) -> Self {
395 Self {
396 error: serde_json::json!(format!("{:?}", e.1)),
397 error_code: 102,
398 context: e.0,
399 }
400 }
401}
402
403impl From<(String, serde_cbor::Error)> for HttpClientError {
404 fn from(e: (String, serde_cbor::Error)) -> Self {
405 Self {
406 error: serde_json::json!(format!("{:?}", e.1)),
407 error_code: 102,
408 context: e.0,
409 }
410 }
411}