cognite/api/api_client.rs
1use crate::IntoParams;
2use anyhow::anyhow;
3use bytes::Bytes;
4use futures::{TryStream, TryStreamExt};
5use prost::Message;
6use reqwest::header::{HeaderValue, CONTENT_LENGTH, CONTENT_TYPE};
7use reqwest::{Body, Response};
8use reqwest_middleware::ClientWithMiddleware;
9use serde::de::DeserializeOwned;
10use serde::ser::Serialize;
11
12use crate::error::{Error, Result};
13
14use super::request_builder::RequestBuilder;
15
16/// API client, used to query CDF.
17pub struct ApiClient {
18 api_base_url: String,
19 app_name: String,
20 client: ClientWithMiddleware,
21 api_version: Option<String>,
22}
23
24impl ApiClient {
25 /// Create a new api client.
26 ///
27 /// # Arguments
28 ///
29 /// * `api_base_url` - Base URL for CDF. For example `https://api.cognitedata.com`
30 /// * `app_name` - App name added to the `x-cdp-app` header.
31 /// * `client` - Underlying reqwest client.
32 pub fn new(api_base_url: &str, app_name: &str, client: ClientWithMiddleware) -> ApiClient {
33 ApiClient {
34 api_base_url: String::from(api_base_url),
35 app_name: String::from(app_name),
36 client,
37 api_version: None,
38 }
39 }
40
41 /// Create a new api client with a custom API version.
42 /// This will set the `cdf-version` header to the given value.
43 ///
44 /// # Arguments
45 ///
46 /// * `api_version` - API version to use.
47 pub fn clone_with_api_version(&self, api_version: &str) -> ApiClient {
48 // We do clone the base URL here, but note that the client is internally
49 // cloneable, so we do still share it cross resources, which is important.
50 ApiClient {
51 api_base_url: self.api_base_url.clone(),
52 app_name: self.app_name.clone(),
53 client: self.client.clone(),
54 api_version: Some(api_version.to_string()),
55 }
56 }
57
58 /// Perform a get request to the given path, deserializing the result from JSON.
59 ///
60 /// # Arguments
61 ///
62 /// * `path` - Request path, without leading slash.
63 pub async fn get<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
64 RequestBuilder::get(self, format!("{}/{}", self.api_base_url, path))
65 .accept_json()
66 .send()
67 .await
68 }
69
70 /// Perform a get request to the given path, with a query given by `params`,
71 /// then deserialize the result from JSON.
72 ///
73 /// # Arguments
74 ///
75 /// * `path` - Request path, without leading slash.
76 /// * `params` - Optional object converted to query parameters.
77 pub async fn get_with_params<T: DeserializeOwned, R: IntoParams>(
78 &self,
79 path: &str,
80 params: Option<R>,
81 ) -> Result<T> {
82 let mut b =
83 RequestBuilder::get(self, format!("{}/{}", self.api_base_url, path)).accept_json();
84
85 if let Some(params) = params {
86 b = b.query(¶ms.into_params());
87 }
88
89 b.send().await
90 }
91
92 /// Perform a get request to the given URL, returning a stream.
93 ///
94 /// # Arguments
95 ///
96 /// * `url` - Full URL to get stream from.
97 pub async fn get_stream(
98 &self,
99 url: &str,
100 ) -> Result<impl TryStream<Ok = bytes::Bytes, Error = reqwest::Error>> {
101 let r = RequestBuilder::get(self, url)
102 .omit_auth_headers()
103 .accept_raw()
104 .send()
105 .await?;
106 Ok(r.bytes_stream())
107 }
108
109 /// Perform a post request to the given path, serializing `object` to JSON and sending it
110 /// as the body, then deserialize the response from JSON.
111 ///
112 /// # Arguments
113 ///
114 /// * `path` - Request path, without leading slash.
115 /// * `object` - Object converted to JSON body.
116 pub async fn post<D, S>(&self, path: &str, object: &S) -> Result<D>
117 where
118 D: DeserializeOwned,
119 S: Serialize,
120 {
121 RequestBuilder::post(self, format!("{}/{}", self.api_base_url, path))
122 .json(object)?
123 .accept_json()
124 .send()
125 .await
126 }
127
128 /// Create a request builder for a `GET` request to `path`.
129 pub fn get_request(&self, path: &str) -> RequestBuilder<'_, ()> {
130 RequestBuilder::get(self, format!("{}/{}", self.api_base_url, path))
131 }
132
133 /// Create a request builder for a `POST` request to `path`.
134 pub fn post_request(&self, path: &str) -> RequestBuilder<'_, ()> {
135 RequestBuilder::post(self, format!("{}/{}", self.api_base_url, path))
136 }
137
138 /// Create a request builder for a `PUT` request to `path`.
139 pub fn put_request(&self, path: &str) -> RequestBuilder<'_, ()> {
140 RequestBuilder::put(self, format!("{}/{}", self.api_base_url, path))
141 }
142
143 /// Create a request builder for a `Delete` request to `path`.
144 pub fn delete_request(&self, path: &str) -> RequestBuilder<'_, ()> {
145 RequestBuilder::delete(self, format!("{}/{}", self.api_base_url, path))
146 }
147
148 /// Perform a post request to the given path, with query parameters given by `params`.
149 /// Deserialize the response from JSON.
150 ///
151 /// * `path` - Request path, without leading slash.
152 /// * `object` - Object converted to JSON body.
153 /// * `params` - Object converted to query parameters.
154 pub async fn post_with_query<D: DeserializeOwned, S: Serialize, R: IntoParams>(
155 &self,
156 path: &str,
157 object: &S,
158 params: Option<R>,
159 ) -> Result<D> {
160 let mut b = self.post_request(path).json(object)?;
161 if let Some(params) = params {
162 b = b.query(¶ms.into_params());
163 }
164 b.accept_json().send().await
165 }
166
167 /// Perform a post request to the given path, posting `value` as protobuf.
168 /// Expects JSON as response.
169 ///
170 /// # Arguments
171 ///
172 /// * `path` - Request path without leading slash.
173 /// * `value` - Protobuf value to post.
174 pub async fn post_protobuf<D: DeserializeOwned + Send + Sync, T: Message>(
175 &self,
176 path: &str,
177 value: &T,
178 ) -> Result<D> {
179 self.post_request(path)
180 .protobuf(value)
181 .accept_json()
182 .send()
183 .await
184 }
185
186 /// Perform a post request to the given path, send `object` as JSON in the body,
187 /// then expect protobuf as response.
188 ///
189 /// # Arguments
190 ///
191 /// * `path` - Request path without leading slash.
192 /// * `object` - Object to convert to JSON and post.
193 pub async fn post_expect_protobuf<D: Message + Default, S: Serialize>(
194 &self,
195 path: &str,
196 object: &S,
197 ) -> Result<D> {
198 self.post_request(path)
199 .json(object)?
200 .accept_protobuf()
201 .send()
202 .await
203 }
204
205 /// Perform a put request with the data in `data`.
206 ///
207 /// # Arguments
208 ///
209 /// * `url` - URL to stream blob to.
210 /// * `mime_type` - What to put in the `X-Upload_Content-Type` header.
211 /// * `data` - Data to upload.
212 pub async fn put_blob(&self, url: &str, mime_type: &str, data: impl Into<Bytes>) -> Result<()> {
213 let bytes: Bytes = data.into();
214 let mut b = RequestBuilder::put(self, url)
215 .body(bytes)
216 .omit_auth_headers()
217 .accept_nothing();
218 if !mime_type.is_empty() {
219 b = b
220 .header(CONTENT_TYPE, HeaderValue::from_str(mime_type)?)
221 .header("X-Upload-Content-Type", HeaderValue::from_str(mime_type)?);
222 }
223 b.send().await
224 }
225
226 /// Perform a put request, streaming data to `url`.
227 ///
228 /// # Arguments
229 ///
230 /// * `url` - URL to stream blob to.
231 /// * `mime_type` - What to put in the `X-Upload_Content-Type` header.
232 /// * `stream` - Stream to upload.
233 /// * `stream_chunked` - If `true`, use chunked streaming to upload the data.
234 /// * `known_size` - Set the `Content-Length` header to this value.
235 ///
236 /// If `known_size` is `None` and `stream_chunked` is `true`, the request will be uploaded using
237 /// special chunked streaming logic. Some backends do not support this.
238 ///
239 /// If `stream_chunked` is true and `known_size` is Some, this will include a content length header,
240 /// it is highly recommended to set this whenever possible.
241 ///
242 /// # Warning
243 /// If `stream_chunked` is false, this will collect the input stream into a memory, which can
244 /// be _very_ expensive.
245 ///
246 pub async fn put_stream<S>(
247 &self,
248 url: &str,
249 mime_type: &str,
250 stream: S,
251 stream_chunked: bool,
252 known_size: Option<u64>,
253 ) -> Result<()>
254 where
255 S: futures::TryStream + Send + 'static,
256 S::Error: Into<Box<dyn std::error::Error + Send + Sync + 'static>>,
257 bytes::Bytes: From<S::Ok>,
258 {
259 let mut b = RequestBuilder::put(self, url)
260 .omit_auth_headers()
261 .accept_nothing();
262 if !mime_type.is_empty() {
263 b = b
264 .header(CONTENT_TYPE, HeaderValue::from_str(mime_type)?)
265 .header("X-Upload-Content-Type", HeaderValue::from_str(mime_type)?);
266 }
267
268 if stream_chunked {
269 if let Some(size) = known_size {
270 b = b.header(CONTENT_LENGTH, HeaderValue::from_str(&size.to_string())?);
271 }
272 b = b.body(Body::wrap_stream(stream));
273 } else {
274 let body: Vec<S::Ok> = stream
275 .try_collect()
276 .await
277 .map_err(|e| Error::StreamError(anyhow!(e.into())))?;
278 let body: Vec<u8> = body
279 .into_iter()
280 .flat_map(Into::<bytes::Bytes>::into)
281 .collect();
282 b = b.body(body);
283 }
284
285 b.send().await
286 }
287
288 /// Perform a put request to `path` with `object` as JSON, expecting JSON in return.
289 ///
290 /// # Arguments
291 ///
292 /// * `path` - Request path without leading slash.
293 /// * `object` - Object to send as JSON.
294 pub async fn put<D, S>(&self, path: &str, object: &S) -> Result<D>
295 where
296 D: DeserializeOwned,
297 S: Serialize,
298 {
299 self.put_request(path)
300 .json(object)?
301 .accept_json()
302 .send()
303 .await
304 }
305
306 /// Perform a delete request to `path`, expecting JSON as response.
307 ///
308 /// # Arguments
309 ///
310 /// * `path` - Request path without leading slash.
311 pub async fn delete<T: DeserializeOwned>(&self, path: &str) -> Result<T> {
312 self.delete_request(path).accept_json().send().await
313 }
314
315 /// Perform a delete request to `path`, with query parameters given by `params`.
316 ///
317 /// # Arguments
318 ///
319 /// * `path` - Request path without leading slash.
320 /// * `params` - Object converted to query parameters.
321 pub async fn delete_with_params<T: DeserializeOwned, R: IntoParams>(
322 &self,
323 path: &str,
324 params: Option<R>,
325 ) -> Result<T> {
326 let mut b = self.delete_request(path).accept_json();
327 if let Some(params) = params {
328 b = b.query(¶ms.into_params());
329 }
330 b.send().await
331 }
332
333 /// Send an arbitrary HTTP request using the client. This will not parse the response,
334 /// but will append authentication headers and retry with the same semantics as any
335 /// normal API call.
336 ///
337 /// # Arguments
338 ///
339 /// * `request_builder` - Request to send.
340 pub async fn send_request(
341 &self,
342 mut request_builder: reqwest_middleware::RequestBuilder,
343 ) -> Result<Response> {
344 request_builder.extensions().insert(self.client.clone());
345
346 Ok(request_builder.send().await?)
347 }
348
349 /// Get the inner HTTP client.
350 pub fn client(&self) -> &ClientWithMiddleware {
351 &self.client
352 }
353
354 /// Get the configured app name.
355 pub fn app_name(&self) -> &str {
356 &self.app_name
357 }
358
359 /// Get the configured API base URL.
360 pub fn api_base_url(&self) -> &str {
361 &self.api_base_url
362 }
363
364 /// Get the CDF API version this client will use.
365 /// Any requests made with the request builder will append a header
366 /// cdf-version with this value, if it is set.
367 pub fn api_version(&self) -> Option<&str> {
368 self.api_version.as_deref()
369 }
370}