actyx_sdk/
http_client.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use bytes::Bytes;
4use derive_more::{Display, Error};
5use futures::{
6    future,
7    stream::{iter, BoxStream, Stream, StreamExt},
8};
9use libipld::Cid;
10use reqwest::{
11    header::{CONTENT_DISPOSITION, CONTENT_TYPE},
12    multipart::Form,
13    Client, RequestBuilder, Response, StatusCode,
14};
15use serde::{Deserialize, Serialize};
16use std::{
17    fmt::Debug,
18    str::FromStr,
19    sync::{Arc, RwLock},
20    time::Duration,
21};
22use url::Url;
23
24use crate::{
25    service::{
26        AuthenticationResponse, EventService, FilesGetResponse, OffsetsResponse, PublishRequest, PublishResponse,
27        QueryRequest, QueryResponse, SubscribeMonotonicRequest, SubscribeMonotonicResponse, SubscribeRequest,
28        SubscribeResponse,
29    },
30    AppManifest, NodeId,
31};
32use rand::Rng;
33
34/// Error type that is returned in the response body by the Event Service when requests fail
35///
36/// The Event Service does not map client errors or internal errors to HTTP status codes,
37/// instead it gives more structured information using this data type, except when the request
38/// is not understood at all.
39#[derive(Clone, Debug, Error, Display, Serialize, Deserialize, PartialEq)]
40#[display(fmt = "error {} while {}: {}", error_code, context, error)]
41#[serde(rename_all = "camelCase")]
42pub struct HttpClientError {
43    pub error: serde_json::Value,
44    pub error_code: u16,
45    pub context: String,
46}
47
48#[derive(Clone)]
49pub struct HttpClient {
50    client: Client,
51    base_url: Url,
52    token: Arc<RwLock<String>>,
53    app_manifest: AppManifest,
54    node_id: NodeId,
55}
56
57async fn get_token(client: &Client, base_url: &Url, app_manifest: &AppManifest) -> anyhow::Result<String> {
58    let body = serde_json::to_value(app_manifest).context(|| format!("serializing {:?}", app_manifest))?;
59    let response = client.post(base_url.join("auth")?).json(&body).send().await?;
60    let bytes = response
61        .bytes()
62        .await
63        .context(|| "getting body for authentication response")?;
64    let token: AuthenticationResponse =
65        serde_json::from_slice(bytes.as_ref()).context(|| "deserializing authentication response")?;
66    Ok(token.token)
67}
68
69impl HttpClient {
70    /// Configures connection to Actyx node with provided Url and AppManifest.
71    /// All path segments of the Url (if any) are discarded.
72    pub async fn new(origin: Url, app_manifest: AppManifest) -> anyhow::Result<Self> {
73        anyhow::ensure!(!origin.cannot_be_a_base(), "{} is not a valid base address", origin);
74        let mut base_url = origin;
75        base_url.set_path("api/v2/");
76        let client = Client::new();
77
78        let node_id = client
79            .get(base_url.join("node/id").unwrap())
80            .send()
81            .await?
82            .text()
83            .await
84            .context(|| "getting body for GET node/id")?
85            .parse()?;
86
87        let token = get_token(&client, &base_url, &app_manifest).await?;
88
89        Ok(Self {
90            client,
91            base_url,
92            token: Arc::new(RwLock::new(token)),
93            app_manifest,
94            node_id,
95        })
96    }
97
98    pub fn node_id(&self) -> NodeId {
99        self.node_id
100    }
101
102    fn events_url(&self, path: &str) -> Url {
103        // Safe to unwrap, because we fully control path creation
104        self.base_url.join(&format!("events/{}", path)).unwrap()
105    }
106
107    fn files_url(&self) -> Url {
108        self.base_url.join("files/").unwrap()
109    }
110
111    async fn re_authenticate(&self) -> anyhow::Result<String> {
112        let token = get_token(&self.client, &self.base_url, &self.app_manifest).await?;
113        let mut write_guard = self.token.write().unwrap();
114        *write_guard = token.clone();
115        Ok(token)
116    }
117
118    /// Makes request to Actyx apis. On http authorization error tries to
119    /// re-authenticate and retries the request ten times with increasing delay.
120    async fn do_request(&self, f: impl FnOnce(&Client) -> RequestBuilder) -> anyhow::Result<Response> {
121        let token = self.token.read().unwrap().clone();
122        let builder = f(&self.client);
123        let builder_clone = builder.try_clone();
124
125        let req = builder.header("Authorization", &format!("Bearer {}", token)).build()?;
126        let url = req.url().clone();
127        let method = req.method().clone();
128        let mut response = self
129            .client
130            .execute(req)
131            .await
132            .context(|| format!("sending {} {}", method, url))?;
133
134        if let Some(builder) = builder_clone.as_ref() {
135            // Request body is not a Stream, so we can retry
136            if response.status() == StatusCode::UNAUTHORIZED {
137                let token = self.re_authenticate().await?;
138                response = builder
139                    .try_clone()
140                    .expect("Already cloned it once")
141                    .header("Authorization", &format!("Bearer {}", token))
142                    .send()
143                    .await
144                    .context(|| format!("sending {} {}", method, url))?;
145            }
146
147            let mut retries = 10;
148            let mut delay = Duration::from_secs(0);
149            loop {
150                if response.status() == StatusCode::SERVICE_UNAVAILABLE && retries > 0 {
151                    retries -= 1;
152                    delay = delay * 2 + Duration::from_millis(rand::thread_rng().gen_range(10..200));
153                    tracing::debug!(
154                        "Actyx Node is overloaded, retrying {} {} with a delay of {:?}",
155                        method,
156                        url,
157                        delay
158                    );
159                    #[cfg(feature = "with-tokio")]
160                    tokio::time::sleep(delay).await;
161                    #[cfg(not(feature = "with-tokio"))]
162                    std::thread::sleep(delay);
163                    response = builder
164                        .try_clone()
165                        .expect("Already cloned it once")
166                        .header("Authorization", &format!("Bearer {}", token))
167                        .send()
168                        .await
169                        .context(|| format!("sending {} {}", method, url))?;
170                } else {
171                    break;
172                }
173            }
174        } else {
175            tracing::warn!("Request can't be retried, as its body is based on a stream");
176            // Request body is a stream, so impossible to retry
177            if response.status() == StatusCode::UNAUTHORIZED {
178                tracing::info!("Can't retry request, but re-authenticated anyway. SDK user must retry request.");
179                self.re_authenticate().await?;
180            }
181        }
182
183        if response.status().is_success() {
184            Ok(response)
185        } else {
186            let error_code = response.status().as_u16();
187            Err(HttpClientError {
188                error: response
189                    .json()
190                    .await
191                    .context(|| format!("getting body for {} reply to {:?}", error_code, builder_clone))?,
192                error_code,
193                context: format!("sending {:?}", builder_clone),
194            }
195            .into())
196        }
197    }
198
199    pub async fn files_post(&self, files: impl IntoIterator<Item = reqwest::multipart::Part>) -> anyhow::Result<Cid> {
200        let mut form = Form::new();
201        for file in files {
202            form = form.part("file", file);
203        }
204        let response = self
205            .do_request(move |c| c.post(self.files_url()).multipart(form))
206            .await?;
207        let hash = response
208            .text_with_charset("utf-8")
209            .await
210            .context(|| "Parsing response".to_string())?;
211        let cid = Cid::from_str(&*hash).map_err(|e| HttpClientError {
212            error: serde_json::Value::String(e.to_string()),
213            error_code: 102,
214            context: format!("Tried to parse {} into a Cid", hash),
215        })?;
216        Ok(cid)
217    }
218
219    pub async fn files_get(&self, cid_or_name: &str) -> anyhow::Result<FilesGetResponse> {
220        let url = self.files_url().join(cid_or_name)?;
221        let response = self.do_request(move |c| c.get(url)).await?;
222
223        let maybe_name = response.headers().get(CONTENT_DISPOSITION).cloned();
224        let maybe_mime = response.headers().get(CONTENT_TYPE).cloned();
225        let bytes = response.bytes().await?;
226        if let Ok(dir @ FilesGetResponse::Directory { .. }) = serde_json::from_slice(bytes.as_ref()) {
227            Ok(dir)
228        } else {
229            let mime = maybe_mime
230                .and_then(|h| h.to_str().ok().map(|x| x.to_string()))
231                .unwrap_or_else(|| "application/octet-stream".to_string());
232            let name = maybe_name
233                .and_then(|n| {
234                    n.to_str().ok().and_then(|p| {
235                        p.split(';')
236                            .find(|x| x.starts_with("filename="))
237                            .map(|f| f.trim_start_matches("filename=").to_string())
238                    })
239                })
240                .unwrap_or_else(|| "".to_string());
241            Ok(FilesGetResponse::File {
242                name,
243                bytes: bytes.to_vec(),
244                mime,
245            })
246        }
247    }
248}
249
250impl Debug for HttpClient {
251    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
252        f.debug_struct("HttpClient")
253            .field("base_url", &self.base_url.as_str())
254            .field("app_manifest", &self.app_manifest)
255            .finish()
256    }
257}
258
259#[async_trait]
260impl EventService for HttpClient {
261    async fn offsets(&self) -> anyhow::Result<OffsetsResponse> {
262        let response = self.do_request(|c| c.get(self.events_url("offsets"))).await?;
263        let bytes = response
264            .bytes()
265            .await
266            .context(|| format!("getting body for GET {}", self.events_url("offsets")))?;
267        Ok(serde_json::from_slice(bytes.as_ref()).context(|| {
268            format!(
269                "deserializing offsets response from {:?} received from GET {}",
270                bytes,
271                self.events_url("offsets")
272            )
273        })?)
274    }
275
276    async fn publish(&self, request: PublishRequest) -> anyhow::Result<PublishResponse> {
277        let body = serde_json::to_value(&request).context(|| format!("serializing {:?}", &request))?;
278        let response = self
279            .do_request(|c| c.post(self.events_url("publish")).json(&body))
280            .await?;
281        let bytes = response
282            .bytes()
283            .await
284            .context(|| format!("getting body for GET {}", self.events_url("publish")))?;
285        Ok(serde_json::from_slice(bytes.as_ref()).context(|| {
286            format!(
287                "deserializing publish response from {:?} received from GET {}",
288                bytes,
289                self.events_url("publish")
290            )
291        })?)
292    }
293
294    async fn query(&self, request: QueryRequest) -> anyhow::Result<BoxStream<'static, QueryResponse>> {
295        let body = serde_json::to_value(&request).context(|| format!("serializing {:?}", &request))?;
296        let response = self
297            .do_request(|c| c.post(self.events_url("query")).json(&body))
298            .await?;
299        let res = to_lines(response.bytes_stream())
300            .map(|bs| serde_json::from_slice(bs.as_ref()))
301            // FIXME this swallows deserialization errors, silently dropping event envelopes
302            .filter_map(|res| future::ready(res.ok()));
303        Ok(res.boxed())
304    }
305
306    async fn subscribe(&self, request: SubscribeRequest) -> anyhow::Result<BoxStream<'static, SubscribeResponse>> {
307        let body = serde_json::to_value(&request).context(|| format!("serializing {:?}", &request))?;
308        let response = self
309            .do_request(|c| c.post(self.events_url("subscribe")).json(&body))
310            .await?;
311        let res = to_lines(response.bytes_stream())
312            .map(|bs| serde_json::from_slice(bs.as_ref()))
313            // FIXME this swallows deserialization errors, silently dropping event envelopes
314            .filter_map(|res| future::ready(res.ok()));
315        Ok(res.boxed())
316    }
317
318    async fn subscribe_monotonic(
319        &self,
320        request: SubscribeMonotonicRequest,
321    ) -> anyhow::Result<BoxStream<'static, SubscribeMonotonicResponse>> {
322        let body = serde_json::to_value(&request).context(|| format!("serializing {:?}", &request))?;
323        let response = self
324            .do_request(|c| c.post(self.events_url("subscribe_monotonic")).json(&body))
325            .await?;
326        let res = to_lines(response.bytes_stream())
327            .map(|bs| serde_json::from_slice(bs.as_ref()))
328            // FIXME this swallows deserialization errors, silently dropping event envelopes
329            .filter_map(|res| future::ready(res.ok()));
330        Ok(res.boxed())
331    }
332}
333
334pub(crate) fn to_lines(stream: impl Stream<Item = Result<Bytes, reqwest::Error>>) -> impl Stream<Item = Vec<u8>> {
335    let mut buf = Vec::<u8>::new();
336    let to_lines = move |bytes: Bytes| {
337        buf.extend_from_slice(bytes.as_ref());
338        let mut ret = buf.split(|b| *b == b'\n').map(|bs| bs.to_vec()).collect::<Vec<_>>();
339        if let Some(last) = ret.pop() {
340            buf.clear();
341            buf.extend_from_slice(last.as_ref());
342        }
343        iter(ret.into_iter().map(|mut bs| {
344            if bs.ends_with(b"\r") {
345                bs.pop();
346            }
347            bs
348        }))
349    };
350    stream
351        .take_while(|res| future::ready(res.is_ok()))
352        .map(|res| res.unwrap())
353        .map(to_lines)
354        .flatten()
355}
356
357pub(crate) trait WithContext {
358    type Output;
359    fn context<F, T>(self, context: F) -> Self::Output
360    where
361        T: Into<String>,
362        F: FnOnce() -> T;
363}
364impl<T, E> WithContext for std::result::Result<T, E>
365where
366    HttpClientError: From<(String, E)>,
367{
368    type Output = std::result::Result<T, HttpClientError>;
369
370    #[inline]
371    fn context<F, C>(self, context: F) -> Self::Output
372    where
373        C: Into<String>,
374        F: FnOnce() -> C,
375    {
376        match self {
377            Ok(value) => Ok(value),
378            Err(err) => Err(HttpClientError::from((context().into(), err))),
379        }
380    }
381}
382
383impl From<(String, reqwest::Error)> for HttpClientError {
384    fn from(e: (String, reqwest::Error)) -> Self {
385        Self {
386            error: serde_json::json!(format!("{:?}", e.1)),
387            error_code: 101,
388            context: e.0,
389        }
390    }
391}
392
393impl From<(String, serde_json::Error)> for HttpClientError {
394    fn from(e: (String, serde_json::Error)) -> Self {
395        Self {
396            error: serde_json::json!(format!("{:?}", e.1)),
397            error_code: 102,
398            context: e.0,
399        }
400    }
401}
402
403impl From<(String, serde_cbor::Error)> for HttpClientError {
404    fn from(e: (String, serde_cbor::Error)) -> Self {
405        Self {
406            error: serde_json::json!(format!("{:?}", e.1)),
407            error_code: 102,
408            context: e.0,
409        }
410    }
411}