cognite/api/
api_client.rs

1use crate::IntoParams;
2use anyhow::anyhow;
3use bytes::Bytes;
4use futures::{TryStream, TryStreamExt};
5use prost::Message;
6use reqwest::header::{HeaderValue, CONTENT_LENGTH, CONTENT_TYPE};
7use reqwest::{Body, Response};
8use reqwest_middleware::ClientWithMiddleware;
9use serde::de::DeserializeOwned;
10use serde::ser::Serialize;
11
12use crate::error::{Error, Result};
13
14use super::request_builder::RequestBuilder;
15
16/// API client, used to query CDF.
17pub struct ApiClient {
18    api_base_url: String,
19    app_name: String,
20    client: ClientWithMiddleware,
21    api_version: Option<String>,
22}
23
24impl ApiClient {
25    /// Create a new api client.
26    ///
27    /// # Arguments
28    ///
29    /// * `api_base_url` - Base URL for CDF. For example `https://api.cognitedata.com`
30    /// * `app_name` - App name added to the `x-cdp-app` header.
31    /// * `client` - Underlying reqwest client.
32    pub fn new(api_base_url: &str, app_name: &str, client: ClientWithMiddleware) -> ApiClient {
33        ApiClient {
34            api_base_url: String::from(api_base_url),
35            app_name: String::from(app_name),
36            client,
37            api_version: None,
38        }
39    }
40
41    /// Create a new api client with a custom API version.
42    /// This will set the `cdf-version` header to the given value.
43    ///
44    /// # Arguments
45    ///
46    /// * `api_version` - API version to use.
47    pub fn clone_with_api_version(&self, api_version: &str) -> ApiClient {
48        // We do clone the base URL here, but note that the client is internally
49        // cloneable, so we do still share it cross resources, which is important.
50        ApiClient {
51            api_base_url: self.api_base_url.clone(),
52            app_name: self.app_name.clone(),
53            client: self.client.clone(),
54            api_version: Some(api_version.to_string()),
55        }
56    }
57
58    /// Perform a get request to the given path, deserializing the result from JSON.
59    ///
60    /// # Arguments
61    ///
62    /// * `path` - Request path, without leading slash.
63    pub async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
64        RequestBuilder::get(self, format!("{}/{}", self.api_base_url, path))
65            .accept_json()
66            .send()
67            .await
68    }
69
70    /// Perform a get request to the given path, with a query given by `params`,
71    /// then deserialize the result from JSON.
72    ///
73    /// # Arguments
74    ///
75    /// * `path` - Request path, without leading slash.
76    /// * `params` - Optional object converted to query parameters.
77    pub async fn get_with_params<T: DeserializeOwned, R: IntoParams>(
78        &self,
79        path: &str,
80        params: Option<R>,
81    ) -> Result<T> {
82        let mut b =
83            RequestBuilder::get(self, format!("{}/{}", self.api_base_url, path)).accept_json();
84
85        if let Some(params) = params {
86            b = b.query(&params.into_params());
87        }
88
89        b.send().await
90    }
91
92    /// Perform a get request to the given URL, returning a stream.
93    ///
94    /// # Arguments
95    ///
96    /// * `url` - Full URL to get stream from.
97    pub async fn get_stream(
98        &self,
99        url: &str,
100    ) -> Result<impl TryStream<Ok = bytes::Bytes, Error = reqwest::Error>> {
101        let r = RequestBuilder::get(self, url)
102            .omit_auth_headers()
103            .accept_raw()
104            .send()
105            .await?;
106        Ok(r.bytes_stream())
107    }
108
109    /// Perform a post request to the given path, serializing `object` to JSON and sending it
110    /// as the body, then deserialize the response from JSON.
111    ///
112    /// # Arguments
113    ///
114    /// * `path` - Request path, without leading slash.
115    /// * `object` - Object converted to JSON body.
116    pub async fn post<D, S>(&self, path: &str, object: &S) -> Result<D>
117    where
118        D: DeserializeOwned,
119        S: Serialize,
120    {
121        RequestBuilder::post(self, format!("{}/{}", self.api_base_url, path))
122            .json(object)?
123            .accept_json()
124            .send()
125            .await
126    }
127
128    /// Create a request builder for a `GET` request to `path`.
129    pub fn get_request(&self, path: &str) -> RequestBuilder<'_, ()> {
130        RequestBuilder::get(self, format!("{}/{}", self.api_base_url, path))
131    }
132
133    /// Create a request builder for a `POST` request to `path`.
134    pub fn post_request(&self, path: &str) -> RequestBuilder<'_, ()> {
135        RequestBuilder::post(self, format!("{}/{}", self.api_base_url, path))
136    }
137
138    /// Create a request builder for a `PUT` request to `path`.
139    pub fn put_request(&self, path: &str) -> RequestBuilder<'_, ()> {
140        RequestBuilder::put(self, format!("{}/{}", self.api_base_url, path))
141    }
142
143    /// Create a request builder for a `Delete` request to `path`.
144    pub fn delete_request(&self, path: &str) -> RequestBuilder<'_, ()> {
145        RequestBuilder::delete(self, format!("{}/{}", self.api_base_url, path))
146    }
147
148    /// Perform a post request to the given path, with query parameters given by `params`.
149    /// Deserialize the response from JSON.
150    ///
151    /// * `path` - Request path, without leading slash.
152    /// * `object` - Object converted to JSON body.
153    /// * `params` - Object converted to query parameters.
154    pub async fn post_with_query<D: DeserializeOwned, S: Serialize, R: IntoParams>(
155        &self,
156        path: &str,
157        object: &S,
158        params: Option<R>,
159    ) -> Result<D> {
160        let mut b = self.post_request(path).json(object)?;
161        if let Some(params) = params {
162            b = b.query(&params.into_params());
163        }
164        b.accept_json().send().await
165    }
166
167    /// Perform a post request to the given path, posting `value` as protobuf.
168    /// Expects JSON as response.
169    ///
170    /// # Arguments
171    ///
172    /// * `path` - Request path without leading slash.
173    /// * `value` - Protobuf value to post.
174    pub async fn post_protobuf<D: DeserializeOwned + Send + Sync, T: Message>(
175        &self,
176        path: &str,
177        value: &T,
178    ) -> Result<D> {
179        self.post_request(path)
180            .protobuf(value)
181            .accept_json()
182            .send()
183            .await
184    }
185
186    /// Perform a post request to the given path, send `object` as JSON in the body,
187    /// then expect protobuf as response.
188    ///
189    /// # Arguments
190    ///
191    /// * `path` - Request path without leading slash.
192    /// * `object` - Object to convert to JSON and post.
193    pub async fn post_expect_protobuf<D: Message + Default, S: Serialize>(
194        &self,
195        path: &str,
196        object: &S,
197    ) -> Result<D> {
198        self.post_request(path)
199            .json(object)?
200            .accept_protobuf()
201            .send()
202            .await
203    }
204
205    /// Perform a put request with the data in `data`.
206    ///
207    /// # Arguments
208    ///
209    /// * `url` - URL to stream blob to.
210    /// * `mime_type` - What to put in the `X-Upload_Content-Type` header.
211    /// * `data` - Data to upload.
212    pub async fn put_blob(&self, url: &str, mime_type: &str, data: impl Into<Bytes>) -> Result<()> {
213        let bytes: Bytes = data.into();
214        let mut b = RequestBuilder::put(self, url)
215            .body(bytes)
216            .omit_auth_headers()
217            .accept_nothing();
218        if !mime_type.is_empty() {
219            b = b
220                .header(CONTENT_TYPE, HeaderValue::from_str(mime_type)?)
221                .header("X-Upload-Content-Type", HeaderValue::from_str(mime_type)?);
222        }
223        b.send().await
224    }
225
226    /// Perform a put request, streaming data to `url`.
227    ///
228    /// # Arguments
229    ///
230    /// * `url` - URL to stream blob to.
231    /// * `mime_type` - What to put in the `X-Upload_Content-Type` header.
232    /// * `stream` - Stream to upload.
233    /// * `stream_chunked` - If `true`, use chunked streaming to upload the data.
234    /// * `known_size` - Set the `Content-Length` header to this value.
235    ///
236    /// If `known_size` is `None` and `stream_chunked` is `true`, the request will be uploaded using
237    /// special chunked streaming logic. Some backends do not support this.
238    ///
239    /// If `stream_chunked` is true and `known_size` is Some, this will include a content length header,
240    /// it is highly recommended to set this whenever possible.
241    ///
242    /// # Warning
243    /// If `stream_chunked` is false, this will collect the input stream into a memory, which can
244    /// be _very_ expensive.
245    ///
246    pub async fn put_stream<S>(
247        &self,
248        url: &str,
249        mime_type: &str,
250        stream: S,
251        stream_chunked: bool,
252        known_size: Option<u64>,
253    ) -> Result<()>
254    where
255        S: futures::TryStream + Send + 'static,
256        S::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
257        bytes::Bytes: From<S::Ok>,
258    {
259        let mut b = RequestBuilder::put(self, url)
260            .omit_auth_headers()
261            .accept_nothing();
262        if !mime_type.is_empty() {
263            b = b
264                .header(CONTENT_TYPE, HeaderValue::from_str(mime_type)?)
265                .header("X-Upload-Content-Type", HeaderValue::from_str(mime_type)?);
266        }
267
268        if stream_chunked {
269            if let Some(size) = known_size {
270                b = b.header(CONTENT_LENGTH, HeaderValue::from_str(&size.to_string())?);
271            }
272            b = b.body(Body::wrap_stream(stream));
273        } else {
274            let body: Vec<S::Ok> = stream
275                .try_collect()
276                .await
277                .map_err(|e| Error::StreamError(anyhow!(e.into())))?;
278            let body: Vec<u8> = body
279                .into_iter()
280                .flat_map(Into::<bytes::Bytes>::into)
281                .collect();
282            b = b.body(body);
283        }
284
285        b.send().await
286    }
287
288    /// Perform a put request to `path` with `object` as JSON, expecting JSON in return.
289    ///
290    /// # Arguments
291    ///
292    /// * `path` - Request path without leading slash.
293    /// * `object` - Object to send as JSON.
294    pub async fn put<D, S>(&self, path: &str, object: &S) -> Result<D>
295    where
296        D: DeserializeOwned,
297        S: Serialize,
298    {
299        self.put_request(path)
300            .json(object)?
301            .accept_json()
302            .send()
303            .await
304    }
305
306    /// Perform a delete request to `path`, expecting JSON as response.
307    ///
308    /// # Arguments
309    ///
310    /// * `path` - Request path without leading slash.
311    pub async fn delete<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
312        self.delete_request(path).accept_json().send().await
313    }
314
315    /// Perform a delete request to `path`, with query parameters given by `params`.
316    ///
317    /// # Arguments
318    ///
319    /// * `path` - Request path without leading slash.
320    /// * `params` - Object converted to query parameters.
321    pub async fn delete_with_params<T: DeserializeOwned, R: IntoParams>(
322        &self,
323        path: &str,
324        params: Option<R>,
325    ) -> Result<T> {
326        let mut b = self.delete_request(path).accept_json();
327        if let Some(params) = params {
328            b = b.query(&params.into_params());
329        }
330        b.send().await
331    }
332
333    /// Send an arbitrary HTTP request using the client. This will not parse the response,
334    /// but will append authentication headers and retry with the same semantics as any
335    /// normal API call.
336    ///
337    /// # Arguments
338    ///
339    /// * `request_builder` - Request to send.
340    pub async fn send_request(
341        &self,
342        mut request_builder: reqwest_middleware::RequestBuilder,
343    ) -> Result<Response> {
344        request_builder.extensions().insert(self.client.clone());
345
346        Ok(request_builder.send().await?)
347    }
348
349    /// Get the inner HTTP client.
350    pub fn client(&self) -> &ClientWithMiddleware {
351        &self.client
352    }
353
354    /// Get the configured app name.
355    pub fn app_name(&self) -> &str {
356        &self.app_name
357    }
358
359    /// Get the configured API base URL.
360    pub fn api_base_url(&self) -> &str {
361        &self.api_base_url
362    }
363
364    /// Get the CDF API version this client will use.
365    /// Any requests made with the request builder will append a header
366    /// cdf-version with this value, if it is set.
367    pub fn api_version(&self) -> Option<&str> {
368        self.api_version.as_deref()
369    }
370}