xt_oss/oss/api/objects/
multi_upload.rs

1// use crate::OssClient;
2use crate::oss::{api::objects::multi_upload::builders::AbortMultipartUploadBuilder, Client};
3use builders::{
4    CompleteMultipartUploadBuilder, InitiateMultipartUploadBuilder, ListMultipartUploadsBuilder,
5    UploadPartBuilder, UploadPartCopyBuilder,
6};
7
8use self::builders::ListPartsBuilder;
9
10#[allow(unused)]
11pub mod builders {
12    use std::collections::HashMap;
13
14    use chrono::{DateTime, Utc};
15    use reqwest::header::CONTENT_LENGTH;
16    use serde::{Deserialize, Serialize};
17
18    use crate::oss::{
19        self,
20        api::{
21            self, bucket::stand::builders::ListObjectQuery, insert_custom_header, insert_header,
22            ApiResponseFrom,
23        },
24        entities::{
25            multi_upload::{
26                CompleteMultipartUploadResult, InitiateMultipartUploadResult,
27                ListMultipartUploadsResult, ListPartsResult,
28            },
29            object, ServerSideEncryption, StorageClass,
30        },
31        http::{
32            self,
33            header::{CACHE_CONTROL, CONTENT_DISPOSITION, CONTENT_ENCODING, CONTENT_TYPE, EXPECT},
34        },
35    };
36
37    #[derive(Debug, Default)]
38    struct InitiateMultipartUploadBuilderHeaders<'a> {
39        cache_control: Option<http::CacheControl>,
40        content_disposition: Option<http::ContentDisposition>,
41        content_encoding: Option<http::ContentEncoding>,
42        expires: Option<&'a str>,
43        content_type: Option<&'a str>,
44        forbid_overwrite: Option<bool>,
45        encryption: Option<ServerSideEncryption>,
46        data_encryption: Option<ServerSideEncryption>,
47        encryption_key_id: Option<&'a str>,
48        storage_class: Option<StorageClass>,
49        oss_tagging: HashMap<String, String>,
50    }
51
52    pub struct InitiateMultipartUploadBuilder<'a> {
53        client: &'a oss::Client<'a>,
54        object: &'a str,
55        encoding_type: Option<&'a str>,
56        headers: InitiateMultipartUploadBuilderHeaders<'a>,
57    }
58
59    impl<'a> InitiateMultipartUploadBuilder<'a> {
60        pub(crate) fn new(client: &'a oss::Client, object: &'a str) -> Self {
61            Self {
62                client,
63                object,
64                encoding_type: None,
65                headers: InitiateMultipartUploadBuilderHeaders::default(),
66            }
67        }
68
69        pub fn with_content_type(mut self, value: &'a str) -> Self {
70            self.headers.content_type = Some(value);
71            self
72        }
73
74        pub fn with_cache_control(mut self, value: http::CacheControl) -> Self {
75            self.headers.cache_control = Some(value);
76            self
77        }
78
79        pub fn with_content_disposition(mut self, value: http::ContentDisposition) -> Self {
80            self.headers.content_disposition = Some(value);
81            self
82        }
83
84        pub fn with_content_encoding(mut self, value: http::ContentEncoding) -> Self {
85            self.headers.content_encoding = Some(value);
86            self
87        }
88
89        pub fn with_expires(mut self, value: &'a str) -> Self {
90            self.headers.expires = Some(value);
91            self
92        }
93
94        pub fn with_forbid_overwrite(mut self, value: bool) -> Self {
95            self.headers.forbid_overwrite = Some(value);
96            self
97        }
98
99        pub fn with_encryption(mut self, value: ServerSideEncryption) -> Self {
100            self.headers.encryption = Some(value);
101            self
102        }
103
104        pub fn with_data_encryption(mut self, value: ServerSideEncryption) -> Self {
105            self.headers.data_encryption = Some(value);
106            self
107        }
108
109        pub fn with_encryption_key_id(mut self, value: &'a str) -> Self {
110            self.headers.encryption_key_id = Some(value);
111            self
112        }
113
114        pub fn with_storage_class(mut self, value: StorageClass) -> Self {
115            self.headers.storage_class = Some(value);
116            self
117        }
118
119        pub fn with_oss_tagging(mut self, key: &'a str, value: &'a str) -> Self {
120            self.headers
121                .oss_tagging
122                .insert(key.to_string(), value.to_string());
123            self
124        }
125
126        fn headers(&self) -> http::HeaderMap {
127            let mut headers = http::HeaderMap::new();
128
129            if let Some(cache_control) = &self.headers.cache_control {
130                insert_header(&mut headers, CACHE_CONTROL, cache_control);
131            }
132
133            if let Some(content_disposition) = &self.headers.content_disposition {
134                insert_header(&mut headers, CONTENT_DISPOSITION, content_disposition);
135            }
136
137            if let Some(content_type) = &self.headers.cache_control {
138                insert_header(&mut headers, CONTENT_TYPE, content_type);
139            }
140
141            if let Some(content_encoding) = &self.headers.content_encoding {
142                insert_header(&mut headers, CONTENT_ENCODING, content_encoding);
143            }
144
145            if let Some(expires) = &self.headers.expires {
146                insert_header(&mut headers, EXPECT, expires);
147            }
148
149            if let Some(content_type) = &self.headers.content_type {
150                insert_header(&mut headers, CONTENT_TYPE, content_type);
151            }
152
153            if let Some(forbid_overwrite) = &self.headers.forbid_overwrite {
154                insert_custom_header(&mut headers, "x-oss-forbid-overwrite", forbid_overwrite);
155            }
156
157            if let Some(encryption) = &self.headers.encryption {
158                insert_custom_header(&mut headers, "x-oss-server-side-encryption", encryption);
159            }
160
161            if let Some(data_encryption) = &self.headers.data_encryption {
162                insert_custom_header(
163                    &mut headers,
164                    "x-oss-server-side-data-encryption",
165                    data_encryption.to_string(),
166                );
167            }
168
169            if let Some(encryption_key_id) = &self.headers.encryption_key_id {
170                insert_custom_header(
171                    &mut headers,
172                    "x-oss-server-side-encryption-key-id",
173                    encryption_key_id,
174                );
175            }
176
177            if let Some(storage_class) = &self.headers.storage_class {
178                insert_custom_header(&mut headers, "x-oss-storage-class", storage_class);
179            }
180
181            if !self.headers.oss_tagging.is_empty() {
182                let value = serde_qs::to_string(&self.headers.oss_tagging)
183                    .expect("Failed to serialize tags");
184                insert_custom_header(&mut headers, "x-oss-tagging", value);
185            }
186
187            headers
188        }
189
190        pub async fn execute(&self) -> api::ApiResult<InitiateMultipartUploadResult> {
191            let mut res = format!("/{}/{}?{}", self.client.bucket(), self.object, "uploads");
192            let mut url = format!("{}?{}", self.client.object_url(self.object), "uploads");
193            if let Some(encoding_type) = self.encoding_type {
194                res = format!("{}&encoding_type={}", res, encoding_type);
195                url = format!("{}&encoding_type={}", res, encoding_type);
196            }
197
198            let resp = self
199                .client
200                .request
201                .task()
202                .with_url(&url)
203                .with_method(http::Method::POST)
204                .with_headers(self.headers())
205                .with_resource(&res)
206                .execute_timeout(self.client.timeout())
207                .await?;
208
209            Ok(ApiResponseFrom(resp).to_type().await)
210        }
211    }
212
213    pub struct UploadPartBuilder<'a> {
214        client: &'a oss::Client<'a>,
215        object: &'a str,
216        part_number: u32,
217        upload_id: &'a str,
218        content: oss::Bytes,
219    }
220
221    impl<'a> UploadPartBuilder<'a> {
222        pub(crate) fn new(client: &'a oss::Client, object: &'a str) -> Self {
223            Self {
224                client,
225                object,
226                part_number: Default::default(),
227                upload_id: Default::default(),
228                content: oss::Bytes::new(),
229            }
230        }
231
232        pub fn with_part_number(mut self, value: u32) -> Self {
233            self.part_number = value;
234            self
235        }
236
237        pub fn with_upload_id(mut self, value: &'a str) -> Self {
238            self.upload_id = value;
239            self
240        }
241
242        pub fn with_content(mut self, value: oss::Bytes) -> Self {
243            self.content = value;
244            self
245        }
246
247        pub async fn execute(&self) -> api::ApiResult {
248            let res = format!(
249                "/{}/{}?partNumber={}&uploadId={}",
250                self.client.bucket(),
251                self.object,
252                self.part_number,
253                self.upload_id
254            );
255            let url = format!(
256                "{}?partNumber={}&uploadId={}",
257                self.client.object_url(self.object),
258                self.part_number,
259                self.upload_id
260            );
261
262            let resp = self
263                .client
264                .request
265                .task()
266                .with_url(&url)
267                .with_method(http::Method::PUT)
268                .with_resource(&res)
269                .with_body(self.content.to_owned())
270                .execute_timeout(self.client.timeout())
271                .await?;
272
273            Ok(ApiResponseFrom(resp).to_empty().await)
274        }
275    }
276
277    pub struct UploadPartCopyBuilder<'a> {
278        client: &'a oss::Client<'a>,
279    }
280
281    impl<'a> UploadPartCopyBuilder<'a> {
282        pub(crate) fn new(client: &'a oss::Client) -> Self {
283            Self { client }
284        }
285
286        pub async fn execute(&self) -> api::ApiResult {
287            todo!()
288        }
289    }
290
291    #[derive(Debug, Default, Serialize)]
292    struct CompleteMultipartUploadBuilderQuery<'a> {
293        #[serde(rename = "uploadId")]
294        upload_id: &'a str,
295        #[serde(rename = "encoding-type", skip_serializing_if = "Option::is_none")]
296        encoding_type: Option<&'a str>,
297    }
298
299    pub struct CompleteMultipartUploadBuilder<'a> {
300        client: &'a oss::Client<'a>,
301        object: &'a str,
302        forbid_overwrite: Option<bool>,
303        query: CompleteMultipartUploadBuilderQuery<'a>,
304    }
305
306    impl<'a> CompleteMultipartUploadBuilder<'a> {
307        pub(crate) fn new(client: &'a oss::Client, object: &'a str) -> Self {
308            Self {
309                client,
310                object,
311                forbid_overwrite: None,
312                query: CompleteMultipartUploadBuilderQuery::default(),
313            }
314        }
315
316        pub fn with_upload_id(mut self, value: &'a str) -> Self {
317            self.query.upload_id = value;
318            self
319        }
320
321        pub fn with_encoding_type(mut self, value: &'a str) -> Self {
322            self.query.encoding_type = Some(value);
323            self
324        }
325
326        pub fn with_forbid_overwrite(mut self, value: bool) -> Self {
327            self.forbid_overwrite = Some(value);
328            self
329        }
330
331        fn query(&self) -> String {
332            serde_qs::to_string(&self.query).unwrap()
333        }
334
335        pub async fn execute(&self) -> api::ApiResult<CompleteMultipartUploadResult> {
336            let res = format!(
337                "/{}/{}?uploadId={}",
338                self.client.bucket(),
339                &self.object,
340                self.query.upload_id
341            );
342            let url = format!("{}?{}", self.client.object_url(self.object), self.query());
343
344            let mut headers = http::HeaderMap::new();
345            insert_header(&mut headers, CONTENT_LENGTH, 0);
346            insert_custom_header(&mut headers, "x-oss-complete-all", "yes");
347            if let Some(true) = self.forbid_overwrite {
348                insert_custom_header(&mut headers, "x-oss-forbid-overwrite", "true");
349            }
350
351            let resp = self
352                .client
353                .request
354                .task()
355                .with_url(&url)
356                .with_method(http::Method::POST)
357                .with_headers(headers)
358                .with_resource(&res)
359                .execute()
360                .await?;
361            Ok(ApiResponseFrom(resp).to_type().await)
362        }
363    }
364
365    pub struct AbortMultipartUploadBuilder<'a> {
366        client: &'a oss::Client<'a>,
367        object: &'a str,
368        upload_id: &'a str,
369    }
370
371    impl<'a> AbortMultipartUploadBuilder<'a> {
372        pub(crate) fn new(client: &'a oss::Client, object: &'a str) -> Self {
373            Self {
374                client,
375                object,
376                upload_id: Default::default(),
377            }
378        }
379
380        pub fn with_upload_id(mut self, upload_id: &'a str) -> Self {
381            self.upload_id = upload_id;
382            self
383        }
384
385        pub async fn execute(&self) -> api::ApiResult {
386            let res = format!(
387                "/{}/{}?uploadId={}",
388                self.client.bucket(),
389                self.object,
390                self.upload_id
391            );
392            let url = format!(
393                "{}?uploadId={}",
394                self.client.object_url(self.object),
395                self.upload_id
396            );
397            let resp = self
398                .client
399                .request
400                .task()
401                .with_url(&url)
402                .with_method(http::Method::DELETE)
403                .with_resource(&res)
404                .execute()
405                .await?;
406            Ok(ApiResponseFrom(resp).to_empty().await)
407        }
408    }
409
410    #[derive(Debug, Default, Serialize, Deserialize)]
411    pub(crate) struct ListMultipartUploadsBuilderQuery<'a> {
412        #[serde(skip_serializing_if = "Option::is_none")]
413        pub(crate) delimiter: Option<&'a str>,
414        #[serde(rename = "encoding-type", skip_serializing_if = "Option::is_none")]
415        pub(crate) encoding_type: Option<&'a str>,
416        #[serde(rename = "key-marker", skip_serializing_if = "Option::is_none")]
417        pub(crate) key_marker: Option<&'a str>,
418        #[serde(rename = "max-uploads", skip_serializing_if = "Option::is_none")]
419        pub(crate) max_uploads: Option<u32>,
420        #[serde(skip_serializing_if = "Option::is_none")]
421        pub(crate) prefix: Option<&'a str>,
422        #[serde(rename = "upload-id-marker", skip_serializing_if = "Option::is_none")]
423        pub(crate) upload_id_marker: Option<&'a str>,
424    }
425
426    pub struct ListMultipartUploadsBuilder<'a> {
427        client: &'a oss::Client<'a>,
428        query: ListMultipartUploadsBuilderQuery<'a>,
429    }
430
431    impl<'a> ListMultipartUploadsBuilder<'a> {
432        pub(crate) fn new(client: &'a oss::Client) -> Self {
433            Self {
434                client,
435                query: ListMultipartUploadsBuilderQuery::default(),
436            }
437        }
438
439        pub fn with_delimiter(mut self, value: &'a str) -> Self {
440            self.query.delimiter = Some(value);
441            self
442        }
443
444        pub fn max_uploads(mut self, value: u32) -> Self {
445            self.query.max_uploads = Some(value);
446            self
447        }
448
449        pub fn with_key_marker(mut self, value: &'a str) -> Self {
450            self.query.key_marker = Some(value);
451            self
452        }
453
454        pub fn with_prefix(mut self, value: &'a str) -> Self {
455            self.query.prefix = Some(value);
456            self
457        }
458
459        pub fn with_upload_id_marker(mut self, value: &'a str) -> Self {
460            self.query.upload_id_marker = Some(value);
461            self
462        }
463
464        pub fn with_encoding_type(mut self, value: &'a str) -> Self {
465            self.query.encoding_type = Some(value);
466            self
467        }
468
469        fn query(&self) -> String {
470            serde_qs::to_string(&self.query).unwrap()
471        }
472
473        pub async fn execute(&self) -> api::ApiResult<ListMultipartUploadsResult> {
474            let mut res = format!("/{}/?{}", self.client.bucket(), "uploads");
475            let mut url = format!("{}?{}", self.client.base_url(), "uploads");
476            let query = self.query();
477            if !query.is_empty() {
478                res = format!("{}&{}", &res, &query);
479                url = format!("{}&{}", &url, &query);
480            }
481            let resp = self
482                .client
483                .request
484                .task()
485                .with_url(&url)
486                .with_method(http::Method::GET)
487                .with_resource(&res)
488                .execute_timeout(self.client.timeout())
489                .await?;
490            Ok(ApiResponseFrom(resp).to_type().await)
491        }
492    }
493
494    #[derive(Debug, Default, Serialize, Deserialize)]
495    struct ListPartsBuilderQuery<'a> {
496        #[serde(rename = "uploadId")]
497        upload_id: &'a str,
498        #[serde(rename = "MaxParts", skip_serializing_if = "Option::is_none")]
499        max_parts: Option<u64>,
500        #[serde(rename = "PartNumberMarker", skip_serializing_if = "Option::is_none")]
501        part_number_marker: Option<u64>,
502        #[serde(rename = "EncodingType", skip_serializing_if = "Option::is_none")]
503        encoding_type: Option<&'a str>,
504    }
505
506    pub struct ListPartsBuilder<'a> {
507        client: &'a oss::Client<'a>,
508        object: &'a str,
509        query: ListPartsBuilderQuery<'a>,
510    }
511
512    impl<'a> ListPartsBuilder<'a> {
513        pub(crate) fn new(client: &'a oss::Client, object: &'a str) -> Self {
514            Self {
515                client,
516                object,
517                query: ListPartsBuilderQuery::default(),
518            }
519        }
520
521        pub fn with_upload_id(mut self, value: &'a str) -> Self {
522            self.query.upload_id = value;
523            self
524        }
525
526        pub fn with_max_parts(mut self, value: u64) -> Self {
527            self.query.max_parts = Some(value);
528            self
529        }
530
531        pub fn with_part_number_marker(mut self, value: u64) -> Self {
532            self.query.part_number_marker = Some(value);
533            self
534        }
535
536        pub fn with_encoding_type(mut self, value: &'a str) -> Self {
537            self.query.encoding_type = Some(value);
538            self
539        }
540
541        fn query(&self) -> String {
542            serde_qs::to_string(&self.query).unwrap()
543        }
544
545        pub async fn execute(&self) -> api::ApiResult<ListPartsResult> {
546            let mut res = format!("/{}/{}", self.client.bucket(), self.object);
547            let mut url = self.client.object_url(self.object);
548            let query = self.query();
549            if !query.is_empty() {
550                res = format!("{}?{}", res, query);
551                url = format!("{}?{}", url, query);
552            }
553            // dbg!(&res);
554            // dbg!(&url);
555            let resp = self
556                .client
557                .request
558                .task()
559                .with_url(&url)
560                .with_resource(&res)
561                .execute()
562                .await?;
563            Ok(ApiResponseFrom(resp).to_type().await)
564        }
565    }
566}
567
568/// 基础操作
569#[allow(non_snake_case)]
570impl<'a> Client<'a> {
571    /// 使用Multipart Upload模式传输数据前,您必须先调用InitiateMultipartUpload接口来通知OSS初始化一
572    /// 个Multipart Upload事件。
573    ///
574    /// - [official docs](https://help.aliyun.com/zh/oss/developer-reference/initiatemultipartupload)
575    /// - [xtoss example](https://github.com/isme-sun/xt_oss/blob/main/examples/api_object_mutil_init.rs)
576    pub fn InitiateMultipartUpload(&self, object: &'a str) -> InitiateMultipartUploadBuilder {
577        InitiateMultipartUploadBuilder::new(self, object)
578    }
579
580    /// 初始化一个MultipartUpload后,调用UploadPart接口根据指定的Object名和uploadId来分块`Part`上传数据。
581    ///
582    /// - [official docs](https://help.aliyun.com/zh/oss/developer-reference/uploadpart)
583    /// - [xtoss example](https://github.com/isme-sun/xt_oss/blob/main/examples/api_object_mutil_upload_part.rs)
584    pub fn UploadPart(&self, object: &'a str) -> UploadPartBuilder {
585        UploadPartBuilder::new(self, object)
586    }
587
588    /// 通过在UploadPart请求的基础上增加一个请求头x-oss-copy-source来调用UploadPartCopy接口,实现从一个
589    /// 已存在的Object中拷贝数据来上传一个Part。
590    ///
591    /// - [official docs](https://help.aliyun.com/zh/oss/developer-reference/uploadpartcopy)
592    /// - [xtoss example](https://github.com/isme-sun/xt_oss/blob/main/examples/api_object_mutil_upload_part.rs)
593    pub fn UploadPartCopy(&self) -> UploadPartCopyBuilder {
594        UploadPartCopyBuilder::new(self)
595    }
596
597    /// 在将所有数据Part都上传完成后,您必须调用CompleteMultipartUpload接口来完成整个文件的分片上传。
598    ///
599    /// - [official docs](https://help.aliyun.com/zh/oss/developer-reference/completemultipartupload)
600    /// - [xtoss example](https://github.com/isme-sun/xt_oss/blob/main/examples/api_object_mutil_comp.rs)
601    pub fn CompleteMultipartUpload(&self, object: &'a str) -> CompleteMultipartUploadBuilder {
602        CompleteMultipartUploadBuilder::new(self, object)
603    }
604
605    /// AbortMultipartUpload接口用于取消MultipartUpload事件并删除对应的Part数据。
606    ///
607    /// - [official docs](https://help.aliyun.com/zh/oss/developer-reference/abortmultipartupload)
608    /// - [xtoss example](https://github.com/isme-sun/xt_oss/blob/main/examples/api_object_mutil_abort.rs)
609    pub fn AbortMultipartUpload(&self, object: &'a str) -> AbortMultipartUploadBuilder {
610        AbortMultipartUploadBuilder::new(self, object)
611    }
612
613    /// 调用ListMultipartUploads接口列举所有执行中的Multipart Upload事件,即已经初始化但还未完成
614    /// `Complete`或者还未中止`Abort`的Multipart Upload事件。
615    ///
616    /// - [official docs](https://help.aliyun.com/zh/oss/developer-reference/listmultipartuploads)
617    /// - [xtoss example](https://github.com/isme-sun/xt_oss/blob/main/examples/api_object_mutil_list.rs)
618    pub fn ListMultipartUploads(&self) -> ListMultipartUploadsBuilder {
619        ListMultipartUploadsBuilder::new(self)
620    }
621
622    /// ListParts接口用于列举指定Upload ID所属的所有已经上传成功Part。
623    ///
624    /// - [official docs](https://help.aliyun.com/zh/oss/developer-reference/listparts)
625    /// - [xtoss example](https://github.com/isme-sun/xt_oss/blob/main/examples/api_object_mutil_list_part.rs)
626    pub fn ListParts(&self, object: &'a str) -> ListPartsBuilder {
627        ListPartsBuilder::new(self, object)
628    }
629}
630
631#[cfg(test)]
632mod tests {
633    use super::builders::ListMultipartUploadsBuilderQuery;
634
635    #[test]
636    fn list_multipart_uploads_builder_query() {
637        let query = ListMultipartUploadsBuilderQuery {
638            delimiter: Some("/"),
639            max_uploads: Some(32),
640            key_marker: Some("abc123"),
641            prefix: Some("abc123"),
642            upload_id_marker: Some("abc123"),
643            encoding_type: Some("url"),
644        };
645        let q = serde_qs::to_string(&query).unwrap();
646        println!("{}", q);
647    }
648}