cognite/api/core/
files.rs

1use std::collections::HashSet;
2
3use bytes::Bytes;
4use futures::TryStream;
5use serde::Serialize;
6use tokio_util::codec::{BytesCodec, FramedRead};
7
8use crate::api::resource::*;
9use crate::dto::core::files::*;
10use crate::dto::items::Items;
11use crate::error::Result;
12use crate::utils::lease::CleanResource;
13use crate::{Error, IdentityList, IdentityOrInstance, IdentityOrInstanceList, PartitionedFilter};
14use crate::{Identity, ItemsVec, Patch};
15
16/// Files store documents, binary blobs, and other file data and relate it to assets.
17pub type Files = Resource<FileMetadata>;
18
19impl WithBasePath for Files {
20    const BASE_PATH: &'static str = "files";
21}
22
23impl FilterWithRequest<PartitionedFilter<FileFilter>, FileMetadata> for Files {}
24impl SearchItems<'_, FileFilter, FileSearch, FileMetadata> for Files {}
25impl<R> RetrieveWithIgnoreUnknownIds<IdentityOrInstanceList<R>, FileMetadata> for Files
26where
27    IdentityOrInstanceList<R>: Serialize,
28    R: Send + Sync,
29{
30}
31impl<R> DeleteWithIgnoreUnknownIds<IdentityList<R>> for Files
32where
33    IdentityList<R>: Serialize,
34    R: Send + Sync,
35{
36}
37impl Update<Patch<PatchFile>, FileMetadata> for Files {}
38
39/// Utility for uploading files in multiple parts.
40pub struct MultipartUploader<'a> {
41    resource: &'a Files,
42    id: IdentityOrInstance,
43    urls: MultiUploadUrls,
44}
45
46impl<'a> MultipartUploader<'a> {
47    /// Create a new multipart uploader.
48    ///
49    /// # Arguments
50    ///
51    /// * `resource` - Files resource.
52    /// * `id` - ID of the file to upload.
53    /// * `urls` - Upload URLs returned from `init_multipart_upload`.
54    pub fn new(resource: &'a Files, id: IdentityOrInstance, urls: MultiUploadUrls) -> Self {
55        Self { resource, id, urls }
56    }
57
58    /// Upload a part given by part index given by `part_no`. The part number
59    /// counts from zero, so with 5 parts you must upload with `part_no` 0, 1, 2, 3, and 4.
60    ///
61    /// # Arguments
62    ///
63    /// * `part_no` - Part number.
64    /// * `stream` - Stream to upload.
65    /// * `size` - Size of stream to upload.
66    pub async fn upload_part_stream<S>(&self, part_no: usize, stream: S, size: u64) -> Result<()>
67    where
68        S: futures::TryStream + Send + 'static,
69        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
70        bytes::Bytes: From<S::Ok>,
71    {
72        if part_no >= self.urls.upload_urls.len() {
73            return Err(Error::Other("Part number out of range".to_owned()));
74        }
75
76        self.resource
77            .upload_stream_known_size("", &self.urls.upload_urls[0], stream, size)
78            .await
79    }
80
81    /// Upload a part given by part index given by `part_no`. The part number
82    /// counts from zero, so with 5 parts you must upload with `part_no` 0, 1, 2, 3, and 4.
83    ///
84    /// # Arguments
85    ///
86    /// * `part_no` - Part number.
87    /// * `file` - File to upload.
88    pub async fn upload_part_file<S>(&self, part_no: usize, file: tokio::fs::File) -> Result<()> {
89        let size = file.metadata().await?.len();
90        let stream = FramedRead::new(file, BytesCodec::new());
91
92        self.upload_part_stream(part_no, stream, size).await
93    }
94
95    /// Upload a part given by part index given by `part_no`. The part number
96    /// counts from zero, so with 5 parts you must upload with `part_no` 0, 1, 2, 3, and 4.
97    ///
98    /// # Arguments
99    ///
100    /// * `part_no` - Part number.
101    /// * `part` - Binary data to upload.
102    pub async fn upload_part_blob(&self, part_no: usize, part: impl Into<Bytes>) -> Result<()> {
103        if part_no >= self.urls.upload_urls.len() {
104            return Err(Error::Other("Part number out of range".to_owned()));
105        }
106        self.resource
107            .upload_blob("", &self.urls.upload_urls[part_no], part)
108            .await
109    }
110
111    /// Complete the multipart upload process after all parts are uploaded.
112    pub async fn complete(self) -> Result<()> {
113        self.resource
114            .complete_multipart_upload(self.id, self.urls.upload_id)
115            .await
116    }
117}
118
119impl Files {
120    /// Upload a stream to a url, the url is received from `Files::upload`
121    ///
122    /// # Arguments
123    ///
124    /// * `mime_type` - Mime type of file to upload. For example `application/pdf`.
125    /// * `url` - URL to upload stream to.
126    /// * `stream` - Stream to upload.
127    /// * `stream_chunked` - Set this to `true` to use chunked streaming. Note that this is not supported for the
128    ///   azure file backend. If this is set to `false`, the entire file is read into memory before uploading, which may
129    ///   be very expensive. Use `upload_stream_known_size` if the size of the file is known.
130    ///
131    /// # Example
132    ///
133    /// ```ignore
134    /// use tokio_util::codec::{BytesCodec, FramedRead};
135    ///
136    /// let file = tokio::fs::File::open("my-file");
137    /// let stream = FramedRead::new(file, BytesCodec::new());
138    /// cognite_client.files.upload_stream(&file.mime_type.unwrap(), &file.upload_url, stream, true).await?;
139    /// ```
140    ///
141    /// Note that `stream_chunked` being true is in general more efficient, but it is not supported
142    /// for the azure file backend.
143    pub async fn upload_stream<S>(
144        &self,
145        mime_type: &str,
146        url: &str,
147        stream: S,
148        stream_chunked: bool,
149    ) -> Result<()>
150    where
151        S: futures::TryStream + Send + 'static,
152        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
153        bytes::Bytes: From<S::Ok>,
154    {
155        self.api_client
156            .put_stream(url, mime_type, stream, stream_chunked, None)
157            .await
158    }
159
160    /// Upload a stream to an url, the url is received from `Files::upload`
161    /// This method requires that the length of the stream in bytes is known before hand.
162    /// If the specified size is wrong, the request may fail or even hang.
163    ///
164    /// # Arguments
165    ///
166    /// * `mime_type` - Mime type of file to upload. For example `application/pdf`.
167    /// * `url` - URL to upload stream to.
168    /// * `stream` - Stream to upload.
169    /// * `size` - Known size of stream in bytes. Note: Do not use this method if the size is not
170    ///   actually known!
171    ///
172    /// # Example
173    ///
174    /// ```ignore
175    /// use tokio_util::codec::{BytesCodec, FramedRead};
176    ///
177    /// let file = tokio::fs::File::open("my-file").await?;
178    /// let size = file.metadata().await?.len();
179    /// let stream = FramedRead::new(file, BytesCodec::new());
180    ///
181    /// cognite_client.files.upload_stream_known_size(&file_res.mime_type.unwrap(), &file_res.extra.upload_url, stream, size).await?;
182    /// ```
183    ///
184    /// Note that this will still stream the data from disk, so it should be as efficient as `upload_stream` with
185    /// `upload_chunked`, but not require the target to accept `content-encoding: chunked`.
186    pub async fn upload_stream_known_size<S>(
187        &self,
188        mime_type: &str,
189        url: &str,
190        stream: S,
191        size: u64,
192    ) -> Result<()>
193    where
194        S: futures::TryStream + Send + 'static,
195        S::Error: Into<Box<dyn std::error::Error + Send + Sync>>,
196        bytes::Bytes: From<S::Ok>,
197    {
198        self.api_client
199            .put_stream(url, mime_type, stream, true, Some(size))
200            .await
201    }
202
203    /// Upload a file as a stream to CDF. `url` should be the upload URL returned from
204    /// `upload`.
205    ///
206    /// # Arguments
207    ///
208    /// * `mime_type` - Mime type of file to upload. For example `application/pdf`.
209    /// * `url` - URL to upload the file to.
210    /// * `file` - File to upload.
211    pub async fn upload_file(
212        &self,
213        mime_type: &str,
214        url: &str,
215        file: tokio::fs::File,
216    ) -> Result<()> {
217        let size = file.metadata().await?.len();
218        let stream = FramedRead::new(file, BytesCodec::new());
219
220        self.api_client
221            .put_stream(url, mime_type, stream, true, Some(size))
222            .await
223    }
224
225    /// Upload a binary vector to `url`.
226    ///
227    /// # Arguments
228    ///
229    /// * `mime_type` - Mime type of file to upload. For example `application/pdf`.
230    /// * `url` - URL to upload blob to.
231    /// * `blob` - File to upload, as bytes.
232    pub async fn upload_blob(
233        &self,
234        mime_type: &str,
235        url: &str,
236        blob: impl Into<Bytes>,
237    ) -> Result<()> {
238        self.api_client.put_blob(url, mime_type, blob).await
239    }
240
241    /// Create a file, optionally overwriting an existing file.
242    ///
243    /// The result will contain an upload URL that can be used to upload a file.
244    ///
245    /// # Arguments
246    ///
247    /// * `overwrite` - Set this to `true` to overwrite existing files with the same `external_id`.
248    ///   If this is `false`, and a file with the given `external_id` already exists, the request will fail.
249    /// * `item` - The file to upload.
250    pub async fn upload(
251        &self,
252        overwrite: bool,
253        item: &AddFile,
254    ) -> Result<FileUploadResult<UploadUrl>> {
255        self.api_client
256            .post_with_query("files", item, Some(FileUploadQuery::new(overwrite)))
257            .await
258    }
259
260    /// Get an upload link for a file with given identity.
261    ///
262    /// # Arguments
263    ///
264    /// `id` - Identity of file metadata or data models file.
265    pub async fn get_upload_link(
266        &self,
267        id: &IdentityOrInstance,
268    ) -> Result<FileUploadResult<UploadUrl>> {
269        let mut res = self
270            .api_client
271            .post::<Items<Vec<FileUploadResult<UploadUrl>>>, _>(
272                "files/uploadlink",
273                &Items::new([id]),
274            )
275            .await?;
276        if res.items.is_empty() {
277            Err(Error::Other(
278                "File with given identity not found.".to_string(),
279            ))
280        } else {
281            Ok(res.items.remove(0))
282        }
283    }
284
285    /// Get multipart upload link for an existing file metadata or data models file.
286    ///
287    /// # Arguments
288    ///
289    /// * `id` - Identity of file metadata or data models file.
290    /// * `parts` - Number of parts to be uploaded.
291    pub async fn get_multipart_upload_link(
292        &self,
293        id: &IdentityOrInstance,
294        parts: u32,
295    ) -> Result<FileUploadResult<MultiUploadUrls>> {
296        let mut res = self
297            .api_client
298            .post_with_query::<Items<Vec<FileUploadResult<MultiUploadUrls>>>, _, _>(
299                "files/multiuploadlink",
300                &Items::new([id]),
301                Some(MultipartGetUploadLinkQuery::new(parts)),
302            )
303            .await?;
304        if res.items.is_empty() {
305            Err(Error::Other(
306                "File with given identity not found.".to_string(),
307            ))
308        } else {
309            Ok(res.items.remove(0))
310        }
311    }
312
313    /// Create a file, specifying that it should be uploaded in multiple parts.
314    ///
315    /// This returns a `MultipartUploader`, which wraps the upload process.
316    ///
317    /// # Arguments
318    ///
319    /// * `overwrite` - Set this to `true` to overwrite existing files with the same `external_id`.
320    ///   If this is `false`, and a file with the given `external_id` already exists, the request will fail.
321    /// * `parts` - The number of parts to upload, should be a number between 1 and 250.
322    /// * `item` - The file to upload.
323    pub async fn multipart_upload<'a>(
324        &'a self,
325        overwrite: bool,
326        parts: u32,
327        item: &AddFile,
328    ) -> Result<(MultipartUploader<'a>, FileMetadata)> {
329        let res = self.init_multipart_upload(overwrite, parts, item).await?;
330        self.create_multipart_upload(res)
331    }
332
333    /// Upload files for an existing file metadata or data models file.
334    ///
335    /// This returns a `MultipartUploader`, which wraps the upload process.
336    ///
337    /// # Arguments
338    ///
339    /// * `parts` - The number of parts to upload, should be a number between 1 and 250.
340    /// * `id` - Identity of file metadata or data models file.
341    pub async fn multipart_upload_existing<'a>(
342        &'a self,
343        id: &IdentityOrInstance,
344        parts: u32,
345    ) -> Result<(MultipartUploader<'a>, FileMetadata)> {
346        let res = self.get_multipart_upload_link(id, parts).await?;
347        self.create_multipart_upload(res)
348    }
349
350    fn create_multipart_upload(
351        &self,
352        res: FileUploadResult<MultiUploadUrls>,
353    ) -> Result<(MultipartUploader<'_>, FileMetadata)> {
354        Ok((
355            MultipartUploader::new(
356                self,
357                IdentityOrInstance::Identity(Identity::Id {
358                    id: res.metadata.id,
359                }),
360                res.extra,
361            ),
362            res.metadata,
363        ))
364    }
365
366    /// Create a file, specifying that it should be uploaded in multiple parts.
367    ///
368    /// # Arguments
369    ///
370    /// * `overwrite` - Set this to `true` to overwrite existing files with the same `external_id`.
371    ///   If this is `false`, and a file with the given `external_id` already exists, the request will fail.
372    /// * `parts` - The number of parts to upload, should be a number between 1 and 250.
373    /// * `item` - The file to upload.
374    pub async fn init_multipart_upload(
375        &self,
376        overwrite: bool,
377        parts: u32,
378        item: &AddFile,
379    ) -> Result<FileUploadResult<MultiUploadUrls>> {
380        self.api_client
381            .post_with_query(
382                "files/initmultipartupload",
383                item,
384                Some(MultipartFileUploadQuery::new(overwrite, parts)),
385            )
386            .await
387    }
388
389    /// Complete a multipart upload. This endpoint must be called after all parts of a multipart file
390    /// upload have been uploaded.
391    ///
392    /// # Arguments
393    ///
394    /// * `id` - ID of the file that was uploaded.
395    /// * `upload_id` - `upload_id` returned by `init_multipart_upload`.
396    pub async fn complete_multipart_upload(
397        &self,
398        id: IdentityOrInstance,
399        upload_id: String,
400    ) -> Result<()> {
401        self.api_client
402            .post::<serde_json::Value, _>(
403                "files/completemultipartupload",
404                &CompleteMultipartUpload { id, upload_id },
405            )
406            .await?;
407        Ok(())
408    }
409
410    /// Get download links for a list of files.
411    ///
412    /// # Arguments
413    ///
414    /// * `ids` - List of file IDs or external IDs.
415    pub async fn download_link(&self, ids: &[IdentityOrInstance]) -> Result<Vec<FileDownloadUrl>> {
416        let items = Items::new(ids);
417        let file_links_response: ItemsVec<FileDownloadUrl> =
418            self.api_client.post("files/downloadlink", &items).await?;
419        Ok(file_links_response.items)
420    }
421
422    /// Stream a file from `url`.
423    ///
424    /// # Arguments
425    ///
426    /// * `url` - URL to download from.
427    pub async fn download(
428        &self,
429        url: &str,
430    ) -> Result<impl TryStream<Ok = bytes::Bytes, Error = reqwest::Error>> {
431        self.api_client.get_stream(url).await
432    }
433
434    /// Stream a file given by `id`.
435    ///
436    /// # Arguments
437    ///
438    /// * `id` - ID or external ID of file to download.
439    pub async fn download_file(
440        &self,
441        id: IdentityOrInstance,
442    ) -> Result<impl TryStream<Ok = bytes::Bytes, Error = reqwest::Error>> {
443        let items = vec![id];
444        let links = self.download_link(&items).await?;
445        let link = links.first().unwrap();
446        self.download(&link.download_url).await
447    }
448}
449
450impl CleanResource<FileMetadata> for Files {
451    async fn clean_resource(
452        &self,
453        resources: Vec<FileMetadata>,
454    ) -> std::result::Result<(), crate::Error> {
455        let ids = resources
456            .iter()
457            .map(|a| Identity::from(a.id))
458            .collect::<HashSet<Identity>>();
459        self.delete(&ids.into_iter().collect::<Vec<_>>(), true)
460            .await
461    }
462}