cognite/api/core/files.rs
1use std::collections::HashSet;
2
3use bytes::Bytes;
4use futures::TryStream;
5use serde::Serialize;
6use tokio_util::codec::{BytesCodec, FramedRead};
7
8use crate::api::resource::*;
9use crate::dto::core::files::*;
10use crate::dto::items::Items;
11use crate::error::Result;
12use crate::utils::lease::CleanResource;
13use crate::{Error, IdentityList, IdentityOrInstance, IdentityOrInstanceList, PartitionedFilter};
14use crate::{Identity, ItemsVec, Patch};
15
16/// Files store documents, binary blobs, and other file data and relate it to assets.
17pub type Files = Resource<FileMetadata>;
18
19impl WithBasePath for Files {
20 const BASE_PATH: &'static str = "files";
21}
22
23impl FilterWithRequest<PartitionedFilter<FileFilter>, FileMetadata> for Files {}
24impl SearchItems<'_, FileFilter, FileSearch, FileMetadata> for Files {}
25impl<R> RetrieveWithIgnoreUnknownIds<IdentityOrInstanceList<R>, FileMetadata> for Files
26where
27 IdentityOrInstanceList<R>: Serialize,
28 R: Send + Sync,
29{
30}
31impl<R> DeleteWithIgnoreUnknownIds<IdentityList<R>> for Files
32where
33 IdentityList<R>: Serialize,
34 R: Send + Sync,
35{
36}
37impl Update<Patch<PatchFile>, FileMetadata> for Files {}
38
39/// Utility for uploading files in multiple parts.
40pub struct MultipartUploader<'a> {
41 resource: &'a Files,
42 id: IdentityOrInstance,
43 urls: MultiUploadUrls,
44}
45
46impl<'a> MultipartUploader<'a> {
47 /// Create a new multipart uploader.
48 ///
49 /// # Arguments
50 ///
51 /// * `resource` - Files resource.
52 /// * `id` - ID of the file to upload.
53 /// * `urls` - Upload URLs returned from `init_multipart_upload`.
54 pub fn new(resource: &'a Files, id: IdentityOrInstance, urls: MultiUploadUrls) -> Self {
55 Self { resource, id, urls }
56 }
57
58 /// Upload a part given by part index given by `part_no`. The part number
59 /// counts from zero, so with 5 parts you must upload with `part_no` 0, 1, 2, 3, and 4.
60 ///
61 /// # Arguments
62 ///
63 /// * `part_no` - Part number.
64 /// * `stream` - Stream to upload.
65 /// * `size` - Size of stream to upload.
66 pub async fn upload_part_stream<S>(&self, part_no: usize, stream: S, size: u64) -> Result<()>
67 where
68 S: futures::TryStream + Send + 'static,
69 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
70 bytes::Bytes: From<S::Ok>,
71 {
72 if part_no >= self.urls.upload_urls.len() {
73 return Err(Error::Other("Part number out of range".to_owned()));
74 }
75
76 self.resource
77 .upload_stream_known_size("", &self.urls.upload_urls[0], stream, size)
78 .await
79 }
80
81 /// Upload a part given by part index given by `part_no`. The part number
82 /// counts from zero, so with 5 parts you must upload with `part_no` 0, 1, 2, 3, and 4.
83 ///
84 /// # Arguments
85 ///
86 /// * `part_no` - Part number.
87 /// * `file` - File to upload.
88 pub async fn upload_part_file<S>(&self, part_no: usize, file: tokio::fs::File) -> Result<()> {
89 let size = file.metadata().await?.len();
90 let stream = FramedRead::new(file, BytesCodec::new());
91
92 self.upload_part_stream(part_no, stream, size).await
93 }
94
95 /// Upload a part given by part index given by `part_no`. The part number
96 /// counts from zero, so with 5 parts you must upload with `part_no` 0, 1, 2, 3, and 4.
97 ///
98 /// # Arguments
99 ///
100 /// * `part_no` - Part number.
101 /// * `part` - Binary data to upload.
102 pub async fn upload_part_blob(&self, part_no: usize, part: impl Into<Bytes>) -> Result<()> {
103 if part_no >= self.urls.upload_urls.len() {
104 return Err(Error::Other("Part number out of range".to_owned()));
105 }
106 self.resource
107 .upload_blob("", &self.urls.upload_urls[part_no], part)
108 .await
109 }
110
111 /// Complete the multipart upload process after all parts are uploaded.
112 pub async fn complete(self) -> Result<()> {
113 self.resource
114 .complete_multipart_upload(self.id, self.urls.upload_id)
115 .await
116 }
117}
118
119impl Files {
120 /// Upload a stream to a url, the url is received from `Files::upload`
121 ///
122 /// # Arguments
123 ///
124 /// * `mime_type` - Mime type of file to upload. For example `application/pdf`.
125 /// * `url` - URL to upload stream to.
126 /// * `stream` - Stream to upload.
127 /// * `stream_chunked` - Set this to `true` to use chunked streaming. Note that this is not supported for the
128 /// azure file backend. If this is set to `false`, the entire file is read into memory before uploading, which may
129 /// be very expensive. Use `upload_stream_known_size` if the size of the file is known.
130 ///
131 /// # Example
132 ///
133 /// ```ignore
134 /// use tokio_util::codec::{BytesCodec, FramedRead};
135 ///
136 /// let file = tokio::fs::File::open("my-file");
137 /// let stream = FramedRead::new(file, BytesCodec::new());
138 /// cognite_client.files.upload_stream(&file.mime_type.unwrap(), &file.upload_url, stream, true).await?;
139 /// ```
140 ///
141 /// Note that `stream_chunked` being true is in general more efficient, but it is not supported
142 /// for the azure file backend.
143 pub async fn upload_stream<S>(
144 &self,
145 mime_type: &str,
146 url: &str,
147 stream: S,
148 stream_chunked: bool,
149 ) -> Result<()>
150 where
151 S: futures::TryStream + Send + 'static,
152 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
153 bytes::Bytes: From<S::Ok>,
154 {
155 self.api_client
156 .put_stream(url, mime_type, stream, stream_chunked, None)
157 .await
158 }
159
160 /// Upload a stream to an url, the url is received from `Files::upload`
161 /// This method requires that the length of the stream in bytes is known before hand.
162 /// If the specified size is wrong, the request may fail or even hang.
163 ///
164 /// # Arguments
165 ///
166 /// * `mime_type` - Mime type of file to upload. For example `application/pdf`.
167 /// * `url` - URL to upload stream to.
168 /// * `stream` - Stream to upload.
169 /// * `size` - Known size of stream in bytes. Note: Do not use this method if the size is not
170 /// actually known!
171 ///
172 /// # Example
173 ///
174 /// ```ignore
175 /// use tokio_util::codec::{BytesCodec, FramedRead};
176 ///
177 /// let file = tokio::fs::File::open("my-file").await?;
178 /// let size = file.metadata().await?.len();
179 /// let stream = FramedRead::new(file, BytesCodec::new());
180 ///
181 /// cognite_client.files.upload_stream_known_size(&file_res.mime_type.unwrap(), &file_res.extra.upload_url, stream, size).await?;
182 /// ```
183 ///
184 /// Note that this will still stream the data from disk, so it should be as efficient as `upload_stream` with
185 /// `upload_chunked`, but not require the target to accept `content-encoding: chunked`.
186 pub async fn upload_stream_known_size<S>(
187 &self,
188 mime_type: &str,
189 url: &str,
190 stream: S,
191 size: u64,
192 ) -> Result<()>
193 where
194 S: futures::TryStream + Send + 'static,
195 S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
196 bytes::Bytes: From<S::Ok>,
197 {
198 self.api_client
199 .put_stream(url, mime_type, stream, true, Some(size))
200 .await
201 }
202
203 /// Upload a file as a stream to CDF. `url` should be the upload URL returned from
204 /// `upload`.
205 ///
206 /// # Arguments
207 ///
208 /// * `mime_type` - Mime type of file to upload. For example `application/pdf`.
209 /// * `url` - URL to upload the file to.
210 /// * `file` - File to upload.
211 pub async fn upload_file(
212 &self,
213 mime_type: &str,
214 url: &str,
215 file: tokio::fs::File,
216 ) -> Result<()> {
217 let size = file.metadata().await?.len();
218 let stream = FramedRead::new(file, BytesCodec::new());
219
220 self.api_client
221 .put_stream(url, mime_type, stream, true, Some(size))
222 .await
223 }
224
225 /// Upload a binary vector to `url`.
226 ///
227 /// # Arguments
228 ///
229 /// * `mime_type` - Mime type of file to upload. For example `application/pdf`.
230 /// * `url` - URL to upload blob to.
231 /// * `blob` - File to upload, as bytes.
232 pub async fn upload_blob(
233 &self,
234 mime_type: &str,
235 url: &str,
236 blob: impl Into<Bytes>,
237 ) -> Result<()> {
238 self.api_client.put_blob(url, mime_type, blob).await
239 }
240
241 /// Create a file, optionally overwriting an existing file.
242 ///
243 /// The result will contain an upload URL that can be used to upload a file.
244 ///
245 /// # Arguments
246 ///
247 /// * `overwrite` - Set this to `true` to overwrite existing files with the same `external_id`.
248 /// If this is `false`, and a file with the given `external_id` already exists, the request will fail.
249 /// * `item` - The file to upload.
250 pub async fn upload(
251 &self,
252 overwrite: bool,
253 item: &AddFile,
254 ) -> Result<FileUploadResult<UploadUrl>> {
255 self.api_client
256 .post_with_query("files", item, Some(FileUploadQuery::new(overwrite)))
257 .await
258 }
259
260 /// Get an upload link for a file with given identity.
261 ///
262 /// # Arguments
263 ///
264 /// `id` - Identity of file metadata or data models file.
265 pub async fn get_upload_link(
266 &self,
267 id: &IdentityOrInstance,
268 ) -> Result<FileUploadResult<UploadUrl>> {
269 let mut res = self
270 .api_client
271 .post::<Items<Vec<FileUploadResult<UploadUrl>>>, _>(
272 "files/uploadlink",
273 &Items::new([id]),
274 )
275 .await?;
276 if res.items.is_empty() {
277 Err(Error::Other(
278 "File with given identity not found.".to_string(),
279 ))
280 } else {
281 Ok(res.items.remove(0))
282 }
283 }
284
285 /// Get multipart upload link for an existing file metadata or data models file.
286 ///
287 /// # Arguments
288 ///
289 /// * `id` - Identity of file metadata or data models file.
290 /// * `parts` - Number of parts to be uploaded.
291 pub async fn get_multipart_upload_link(
292 &self,
293 id: &IdentityOrInstance,
294 parts: u32,
295 ) -> Result<FileUploadResult<MultiUploadUrls>> {
296 let mut res = self
297 .api_client
298 .post_with_query::<Items<Vec<FileUploadResult<MultiUploadUrls>>>, _, _>(
299 "files/multiuploadlink",
300 &Items::new([id]),
301 Some(MultipartGetUploadLinkQuery::new(parts)),
302 )
303 .await?;
304 if res.items.is_empty() {
305 Err(Error::Other(
306 "File with given identity not found.".to_string(),
307 ))
308 } else {
309 Ok(res.items.remove(0))
310 }
311 }
312
313 /// Create a file, specifying that it should be uploaded in multiple parts.
314 ///
315 /// This returns a `MultipartUploader`, which wraps the upload process.
316 ///
317 /// # Arguments
318 ///
319 /// * `overwrite` - Set this to `true` to overwrite existing files with the same `external_id`.
320 /// If this is `false`, and a file with the given `external_id` already exists, the request will fail.
321 /// * `parts` - The number of parts to upload, should be a number between 1 and 250.
322 /// * `item` - The file to upload.
323 pub async fn multipart_upload<'a>(
324 &'a self,
325 overwrite: bool,
326 parts: u32,
327 item: &AddFile,
328 ) -> Result<(MultipartUploader<'a>, FileMetadata)> {
329 let res = self.init_multipart_upload(overwrite, parts, item).await?;
330 self.create_multipart_upload(res)
331 }
332
333 /// Upload files for an existing file metadata or data models file.
334 ///
335 /// This returns a `MultipartUploader`, which wraps the upload process.
336 ///
337 /// # Arguments
338 ///
339 /// * `parts` - The number of parts to upload, should be a number between 1 and 250.
340 /// * `id` - Identity of file metadata or data models file.
341 pub async fn multipart_upload_existing<'a>(
342 &'a self,
343 id: &IdentityOrInstance,
344 parts: u32,
345 ) -> Result<(MultipartUploader<'a>, FileMetadata)> {
346 let res = self.get_multipart_upload_link(id, parts).await?;
347 self.create_multipart_upload(res)
348 }
349
350 fn create_multipart_upload(
351 &self,
352 res: FileUploadResult<MultiUploadUrls>,
353 ) -> Result<(MultipartUploader<'_>, FileMetadata)> {
354 Ok((
355 MultipartUploader::new(
356 self,
357 IdentityOrInstance::Identity(Identity::Id {
358 id: res.metadata.id,
359 }),
360 res.extra,
361 ),
362 res.metadata,
363 ))
364 }
365
366 /// Create a file, specifying that it should be uploaded in multiple parts.
367 ///
368 /// # Arguments
369 ///
370 /// * `overwrite` - Set this to `true` to overwrite existing files with the same `external_id`.
371 /// If this is `false`, and a file with the given `external_id` already exists, the request will fail.
372 /// * `parts` - The number of parts to upload, should be a number between 1 and 250.
373 /// * `item` - The file to upload.
374 pub async fn init_multipart_upload(
375 &self,
376 overwrite: bool,
377 parts: u32,
378 item: &AddFile,
379 ) -> Result<FileUploadResult<MultiUploadUrls>> {
380 self.api_client
381 .post_with_query(
382 "files/initmultipartupload",
383 item,
384 Some(MultipartFileUploadQuery::new(overwrite, parts)),
385 )
386 .await
387 }
388
389 /// Complete a multipart upload. This endpoint must be called after all parts of a multipart file
390 /// upload have been uploaded.
391 ///
392 /// # Arguments
393 ///
394 /// * `id` - ID of the file that was uploaded.
395 /// * `upload_id` - `upload_id` returned by `init_multipart_upload`.
396 pub async fn complete_multipart_upload(
397 &self,
398 id: IdentityOrInstance,
399 upload_id: String,
400 ) -> Result<()> {
401 self.api_client
402 .post::<serde_json::Value, _>(
403 "files/completemultipartupload",
404 &CompleteMultipartUpload { id, upload_id },
405 )
406 .await?;
407 Ok(())
408 }
409
410 /// Get download links for a list of files.
411 ///
412 /// # Arguments
413 ///
414 /// * `ids` - List of file IDs or external IDs.
415 pub async fn download_link(&self, ids: &[IdentityOrInstance]) -> Result<Vec<FileDownloadUrl>> {
416 let items = Items::new(ids);
417 let file_links_response: ItemsVec<FileDownloadUrl> =
418 self.api_client.post("files/downloadlink", &items).await?;
419 Ok(file_links_response.items)
420 }
421
422 /// Stream a file from `url`.
423 ///
424 /// # Arguments
425 ///
426 /// * `url` - URL to download from.
427 pub async fn download(
428 &self,
429 url: &str,
430 ) -> Result<impl TryStream<Ok = bytes::Bytes, Error = reqwest::Error>> {
431 self.api_client.get_stream(url).await
432 }
433
434 /// Stream a file given by `id`.
435 ///
436 /// # Arguments
437 ///
438 /// * `id` - ID or external ID of file to download.
439 pub async fn download_file(
440 &self,
441 id: IdentityOrInstance,
442 ) -> Result<impl TryStream<Ok = bytes::Bytes, Error = reqwest::Error>> {
443 let items = vec![id];
444 let links = self.download_link(&items).await?;
445 let link = links.first().unwrap();
446 self.download(&link.download_url).await
447 }
448}
449
450impl CleanResource<FileMetadata> for Files {
451 async fn clean_resource(
452 &self,
453 resources: Vec<FileMetadata>,
454 ) -> std::result::Result<(), crate::Error> {
455 let ids = resources
456 .iter()
457 .map(|a| Identity::from(a.id))
458 .collect::<HashSet<Identity>>();
459 self.delete(&ids.into_iter().collect::<Vec<_>>(), true)
460 .await
461 }
462}