Skip to main content

ali_oss_rs/
multipart.rs

1//! Mutlipart uploads related operations module
2
3use std::{ops::Range, path::Path};
4
5use async_trait::async_trait;
6use base64::{prelude::BASE64_STANDARD, Engine};
7
8use crate::{
9    error::Error,
10    multipart_common::{
11        build_complete_multipart_uploads_request, build_initiate_multipart_uploads_request, build_list_multipart_uploads_request, build_list_parts_request,
12        build_upload_part_copy_request, build_upload_part_request, CompleteMultipartUploadApiResponse, CompleteMultipartUploadOptions,
13        CompleteMultipartUploadRequest, CompleteMultipartUploadResult, InitiateMultipartUploadOptions, InitiateMultipartUploadResult,
14        ListMultipartUploadsOptions, ListMultipartUploadsResult, ListPartsOptions, ListPartsResult, UploadPartCopyOptions, UploadPartCopyRequest,
15        UploadPartCopyResult, UploadPartRequest, UploadPartResult,
16    },
17    request::{OssRequest, RequestMethod},
18    util::{validate_bucket_name, validate_object_key},
19    Client, RequestBody, Result,
20};
21
22#[async_trait]
23pub trait MultipartUploadsOperations {
24    /// List multipart uploads which are initialized but not completed nor aborted.
25    ///
26    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/listmultipartuploads>
27    async fn list_multipart_uploads<S>(&self, bucket_name: S, options: Option<ListMultipartUploadsOptions>) -> Result<ListMultipartUploadsResult>
28    where
29        S: AsRef<str> + Send;
30
31    /// List parts which uploaded successfully associated with the given `upload_id`
32    ///
33    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/listparts>
34    async fn list_parts<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3, options: Option<ListPartsOptions>) -> Result<ListPartsResult>
35    where
36        S1: AsRef<str> + Send,
37        S2: AsRef<str> + Send,
38        S3: AsRef<str> + Send;
39
40    /// Initiate multipart uploads
41    ///
42    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/initiatemultipartupload>
43    async fn initiate_multipart_uploads<S1, S2>(
44        &self,
45        bucket_name: S1,
46        object_key: S2,
47        options: Option<InitiateMultipartUploadOptions>,
48    ) -> Result<InitiateMultipartUploadResult>
49    where
50        S1: AsRef<str> + Send,
51        S2: AsRef<str> + Send;
52
53    /// Upload part of a file. the caller should take responsibility to make sure the range is valid.
54    ///
55    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/uploadpart>
56    async fn upload_part_from_file<S1, S2, P>(
57        &self,
58        bucket_name: S1,
59        object_key: S2,
60        file_path: P,
61        range: Range<u64>,
62        params: UploadPartRequest,
63    ) -> Result<UploadPartResult>
64    where
65        S1: AsRef<str> + Send,
66        S2: AsRef<str> + Send,
67        P: AsRef<Path> + Send;
68
69    /// Upload part from buffer.
70    ///
71    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/uploadpart>
72    async fn upload_part_from_buffer<S1, S2, B>(&self, bucket_name: S1, object_key: S2, buffer: B, params: UploadPartRequest) -> Result<UploadPartResult>
73    where
74        S1: AsRef<str> + Send,
75        S2: AsRef<str> + Send,
76        B: Into<Vec<u8>> + Send;
77
78    /// Upload part from base64 string.
79    ///
80    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/uploadpart>
81    async fn upload_part_from_base64<S1, S2, S3>(
82        &self,
83        bucket_name: S1,
84        object_key: S2,
85        base64_string: S3,
86        params: UploadPartRequest,
87    ) -> Result<UploadPartResult>
88    where
89        S1: AsRef<str> + Send,
90        S2: AsRef<str> + Send,
91        S3: AsRef<str> + Send;
92
93    /// When you want to copy a file larger than 1GB, you must use `upload_part_copy`.
94    /// First, initiate a multipart upload and get `uploadId`, then call this method to upload parts of the source object.
95    /// Finally complete the multipart upload by invoking `complete_multipart_uploads`
96    ///
97    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/uploadpartcopy>
98    async fn upload_part_copy<S1, S2>(
99        &self,
100        bucket_name: S1,
101        dest_object_key: S2,
102        data: UploadPartCopyRequest,
103        options: Option<UploadPartCopyOptions>,
104    ) -> Result<UploadPartCopyResult>
105    where
106        S1: AsRef<str> + Send,
107        S2: AsRef<str> + Send;
108
109    /// Complete multipart uploads
110    ///
111    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/completemultipartupload>
112    async fn complete_multipart_uploads<S1, S2>(
113        &self,
114        bucket_name: S1,
115        object_key: S2,
116        data: CompleteMultipartUploadRequest,
117        options: Option<CompleteMultipartUploadOptions>,
118    ) -> Result<CompleteMultipartUploadResult>
119    where
120        S1: AsRef<str> + Send,
121        S2: AsRef<str> + Send;
122
123    /// About multipart uploads
124    ///
125    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/abortmultipartupload>
126    async fn abort_multipart_uploads<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3) -> Result<()>
127    where
128        S1: AsRef<str> + Send,
129        S2: AsRef<str> + Send,
130        S3: AsRef<str> + Send;
131}
132
133#[async_trait]
134impl MultipartUploadsOperations for Client {
135    /// List multipart uploads which are initialized but not completed nor aborted.
136    ///
137    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/listmultipartuploads>
138    async fn list_multipart_uploads<S>(&self, bucket_name: S, options: Option<ListMultipartUploadsOptions>) -> Result<ListMultipartUploadsResult>
139    where
140        S: AsRef<str> + Send,
141    {
142        if !validate_bucket_name(bucket_name.as_ref()) {
143            return Err(Error::Other(format!("invalid bucket name: {}", bucket_name.as_ref())));
144        }
145        let request = build_list_multipart_uploads_request(bucket_name.as_ref(), &options)?;
146        let (_, xml) = self.do_request::<String>(request).await?;
147
148        ListMultipartUploadsResult::from_xml(&xml)
149    }
150
151    /// List parts which uploaded successfully associated with the given `upload_id`
152    ///
153    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/listparts>
154    async fn list_parts<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3, options: Option<ListPartsOptions>) -> Result<ListPartsResult>
155    where
156        S1: AsRef<str> + Send,
157        S2: AsRef<str> + Send,
158        S3: AsRef<str> + Send,
159    {
160        let request = build_list_parts_request(bucket_name.as_ref(), object_key.as_ref(), upload_id.as_ref(), &options)?;
161        let (_, xml) = self.do_request::<String>(request).await?;
162        ListPartsResult::from_xml(&xml)
163    }
164
165    /// Initiate multipart uploads
166    ///
167    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/initiatemultipartupload>
168    async fn initiate_multipart_uploads<S1, S2>(
169        &self,
170        bucket_name: S1,
171        object_key: S2,
172        options: Option<InitiateMultipartUploadOptions>,
173    ) -> Result<InitiateMultipartUploadResult>
174    where
175        S1: AsRef<str> + Send,
176        S2: AsRef<str> + Send,
177    {
178        let request = build_initiate_multipart_uploads_request(bucket_name.as_ref(), object_key.as_ref(), &options)?;
179        let (_, xml) = self.do_request::<String>(request).await?;
180        InitiateMultipartUploadResult::from_xml(&xml)
181    }
182
183    /// Upload part of a file. the caller should take responsibility to make sure the range is valid.
184    ///
185    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/uploadpart>
186    async fn upload_part_from_file<S1, S2, P>(
187        &self,
188        bucket_name: S1,
189        object_key: S2,
190        file_path: P,
191        range: Range<u64>,
192        params: UploadPartRequest,
193    ) -> Result<UploadPartResult>
194    where
195        S1: AsRef<str> + Send,
196        S2: AsRef<str> + Send,
197        P: AsRef<Path> + Send,
198    {
199        let request = build_upload_part_request(
200            bucket_name.as_ref(),
201            object_key.as_ref(),
202            RequestBody::File(file_path.as_ref().to_path_buf(), Some(range)),
203            params,
204        )?;
205
206        let (headers, _) = self.do_request::<()>(request).await?;
207
208        Ok(headers.into())
209    }
210
211    /// Upload part from buffer.
212    ///
213    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/uploadpart>
214    async fn upload_part_from_buffer<S1, S2, B>(&self, bucket_name: S1, object_key: S2, buffer: B, params: UploadPartRequest) -> Result<UploadPartResult>
215    where
216        S1: AsRef<str> + Send,
217        S2: AsRef<str> + Send,
218        B: Into<Vec<u8>> + Send,
219    {
220        let request = build_upload_part_request(bucket_name.as_ref(), object_key.as_ref(), RequestBody::Bytes(buffer.into()), params)?;
221
222        let (headers, _) = self.do_request::<()>(request).await?;
223
224        Ok(headers.into())
225    }
226
227    /// Upload part from base64 string.
228    ///
229    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/uploadpart>
230    async fn upload_part_from_base64<S1, S2, S3>(
231        &self,
232        bucket_name: S1,
233        object_key: S2,
234        base64_string: S3,
235        params: UploadPartRequest,
236    ) -> Result<UploadPartResult>
237    where
238        S1: AsRef<str> + Send,
239        S2: AsRef<str> + Send,
240        S3: AsRef<str> + Send,
241    {
242        let data = BASE64_STANDARD.decode(base64_string.as_ref())?;
243        self.upload_part_from_buffer(bucket_name, object_key, data, params).await
244    }
245
246    /// When you want to copy a file larger than 1GB, you must use `upload_part_copy`.
247    /// First, initiate a multipart upload and get `uploadId`, then call this method to upload parts of the source object.
248    /// Finally complete the multipart upload by invoking `complete_multipart_uploads`
249    ///
250    /// Offical document: <https://help.aliyun.com/zh/oss/developer-reference/uploadpartcopy>
251    async fn upload_part_copy<S1, S2>(
252        &self,
253        bucket_name: S1,
254        dest_object_key: S2,
255        data: UploadPartCopyRequest,
256        options: Option<UploadPartCopyOptions>,
257    ) -> Result<UploadPartCopyResult>
258    where
259        S1: AsRef<str> + Send,
260        S2: AsRef<str> + Send,
261    {
262        let bucket_name = bucket_name.as_ref();
263        let object_key = dest_object_key.as_ref();
264        let requet = build_upload_part_copy_request(bucket_name, object_key, data, &options)?;
265        let (_, xml) = self.do_request::<String>(requet).await?;
266        UploadPartCopyResult::from_xml(&xml)
267    }
268
269    /// Complete multipart uploads
270    ///
271    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/completemultipartupload>
272    async fn complete_multipart_uploads<S1, S2>(
273        &self,
274        bucket_name: S1,
275        object_key: S2,
276        data: CompleteMultipartUploadRequest,
277        options: Option<CompleteMultipartUploadOptions>,
278    ) -> Result<CompleteMultipartUploadResult>
279    where
280        S1: AsRef<str> + Send,
281        S2: AsRef<str> + Send,
282    {
283        let with_callback = if let Some(opt) = &options { opt.callback.is_some() } else { false };
284
285        let request = build_complete_multipart_uploads_request(bucket_name.as_ref(), object_key.as_ref(), data, &options)?;
286        let (_, content) = self.do_request::<String>(request).await?;
287
288        if with_callback {
289            Ok(CompleteMultipartUploadResult::CallbackResponse(content))
290        } else {
291            Ok(CompleteMultipartUploadResult::ApiResponse(CompleteMultipartUploadApiResponse::from_xml(
292                &content,
293            )?))
294        }
295    }
296
297    /// About multipart uploads
298    ///
299    /// Official document: <https://help.aliyun.com/zh/oss/developer-reference/abortmultipartupload>
300    async fn abort_multipart_uploads<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3) -> Result<()>
301    where
302        S1: AsRef<str> + Send,
303        S2: AsRef<str> + Send,
304        S3: AsRef<str> + Send,
305    {
306        let bucket_name = bucket_name.as_ref();
307        let object_key = object_key.as_ref();
308
309        if !validate_bucket_name(bucket_name) {
310            return Err(Error::Other(format!("invalid bucket name: {}", bucket_name)));
311        }
312
313        if !validate_object_key(object_key) {
314            return Err(Error::Other(format!("invalid object key: {}", object_key)));
315        }
316
317        if upload_id.as_ref().is_empty() {
318            return Err(Error::Other("invalid upload id: [empty]".to_string()));
319        }
320
321        let request = OssRequest::new()
322            .method(RequestMethod::Delete)
323            .bucket(bucket_name)
324            .object(object_key)
325            .add_query("uploadId", upload_id);
326
327        self.do_request::<()>(request).await?;
328
329        Ok(())
330    }
331}
332
333#[cfg(test)]
334mod test_multipart_async {
335    use std::{
336        io::{Read, Seek},
337        ops::Range,
338        sync::Once,
339    };
340
341    use uuid::Uuid;
342
343    use crate::{
344        Client, multipart::MultipartUploadsOperations, multipart_common::{
345            CompleteMultipartUploadOptions, CompleteMultipartUploadRequest, CompleteMultipartUploadResult, InitiateMultipartUploadOptionsBuilder, UploadPartCopyOptionsBuilder, UploadPartCopyRequest, UploadPartRequest
346        }, object::ObjectOperations, object_common::{CallbackBodyParameter, CallbackBuilder}
347    };
348
349    static INIT: Once = Once::new();
350
351    fn setup() {
352        INIT.call_once(|| {
353            simple_logger::init_with_level(log::Level::Debug).unwrap();
354            dotenvy::dotenv().unwrap();
355        });
356    }
357
358    #[allow(dead_code)]
359    fn setup_comp() {
360        INIT.call_once(|| {
361            simple_logger::init_with_level(log::Level::Debug).unwrap();
362            dotenvy::from_filename(".env.comp").unwrap();
363        });
364    }
365
366    // #[tokio::test]
367    // async fn test_list_multipart_uploads_async_1() {
368    //     setup_comp();
369
370    //     let client = Client::from_env();
371    //     let response = client.list_multipart_uploads("mi-dev-public", None).await;
372    //     assert!(response.is_ok());
373
374    //     let ret = response.unwrap();
375    //     log::debug!("{:#?}", ret);
376
377    //     assert!(ret.max_uploads > 0);
378    // }
379
380    // #[tokio::test]
381    // async fn test_list_multipart_uploads_async_2() {
382    //     setup_comp();
383
384    //     let client = Client::from_env();
385    //     let options = ListMultipartUploadsOptionsBuilder::new()
386    //         .prefix("builder/editor/2023/000-278/videos/c29s08f01-032-663b31e15a44347d59de9e75/")
387    //         .delimiter('/')
388    //         .max_uploads(20)
389    //         .build();
390
391    //     let response = client.list_multipart_uploads("mi-dev-public", Some(options)).await;
392    //     assert!(response.is_ok());
393
394    //     let ret = response.unwrap();
395    //     log::debug!("{:#?}", ret);
396
397    //     assert!(ret.max_uploads > 0);
398    // }
399
400    #[tokio::test]
401    async fn test_multipart_uploads_from_file_async() {
402        setup_comp();
403
404        let client = Client::from_env();
405
406        let bucket = "mi-dev-private";
407        let object = format!("yuanyu-test/rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
408        let file = "/home/yuanyq/Downloads/apifox_2.8.11_amd64.deb";
409
410        let meta = std::fs::metadata(file).unwrap();
411
412        let slice_len: u64 = 50 * 1024 * 1024;
413        let mut ranges = vec![];
414        let mut c = 0;
415        loop {
416            let end = (c + 1) * slice_len;
417            let r = Range {
418                start: c * slice_len,
419                end: end.min(meta.len()),
420            };
421
422            ranges.push(r);
423
424            if end >= meta.len() {
425                break;
426            }
427
428            c += 1;
429        }
430
431        log::debug!("{:#?}", ranges);
432
433        let opt = InitiateMultipartUploadOptionsBuilder::new().parameter("sequential", "").build();
434
435        let init_response = client.initiate_multipart_uploads(bucket, &object, Some(opt)).await;
436        assert!(init_response.is_ok());
437
438        let init_result = init_response.unwrap();
439        let upload_id = init_result.upload_id.clone();
440        log::debug!("upload id = {}", upload_id);
441
442        let mut upload_results = vec![];
443
444        for (i, rng) in ranges.iter().enumerate() {
445            let upload_data = UploadPartRequest {
446                part_number: (i + 1) as u32,
447                upload_id: upload_id.clone(),
448            };
449
450            log::debug!("begin to upload part {}", i);
451
452            let upload_response = client.upload_part_from_file(bucket, &object, file, rng.clone(), upload_data).await;
453
454            log::debug!("{:#?}", upload_response);
455
456            assert!(upload_response.is_ok());
457
458            let upload_result = upload_response.unwrap();
459            upload_results.push(((i + 1) as u32, upload_result.etag));
460        }
461
462        log::debug!("all parts uploaded, check it");
463        let resp = client.list_parts(bucket, &object, &upload_id, None).await;
464        assert!(resp.is_ok());
465
466        let ret = resp.unwrap();
467        assert_eq!(ranges.len(), ret.parts.len());
468
469        log::debug!("going to complete multipart upload for upload id: {}", upload_id);
470
471        let comp_response = client
472            .complete_multipart_uploads(
473                bucket,
474                &object,
475                CompleteMultipartUploadRequest {
476                    upload_id,
477                    parts: upload_results,
478                },
479                None,
480            )
481            .await;
482
483        log::debug!("{:#?}", comp_response);
484
485        log::debug!("multipart uploads completed");
486
487        client.delete_object(bucket, &object, None).await.unwrap();
488    }
489
490    #[tokio::test]
491    async fn test_upload_part_from_buffer_async() {
492        setup();
493
494        let client = Client::from_env();
495
496        let bucket = "yuanyq";
497        let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
498        let file = "/home/yuanyq/Downloads/sourcegit_2025.06-1_amd64.deb";
499
500        let meta = std::fs::metadata(file).unwrap();
501
502        let slice_len: u64 = 10 * 1024 * 1024;
503        let mut ranges = vec![];
504        let mut c = 0;
505        loop {
506            let end = (c + 1) * slice_len;
507            let r = Range {
508                start: c * slice_len,
509                end: end.min(meta.len()),
510            };
511
512            ranges.push(r);
513
514            if end >= meta.len() {
515                break;
516            }
517
518            c += 1;
519        }
520
521        log::debug!("{:#?}", ranges);
522
523        let init_response = client.initiate_multipart_uploads(bucket, &object, None).await;
524        assert!(init_response.is_ok());
525
526        let init_result = init_response.unwrap();
527        let upload_id = init_result.upload_id.clone();
528        log::debug!("upload id = {}", upload_id);
529
530        let mut upload_results = vec![];
531
532        for (i, rng) in ranges.iter().enumerate() {
533            let part_no = (i + 1) as u32;
534
535            log::debug!("begin to upload part {}", i);
536
537            let mut buf = Vec::new();
538            let mut stream = std::fs::File::open(file).unwrap();
539            stream.seek(std::io::SeekFrom::Start(rng.start)).unwrap();
540            let mut partial = stream.take(rng.end - rng.start);
541            partial.read_to_end(&mut buf).unwrap();
542
543            let upload_data = UploadPartRequest {
544                part_number: part_no,
545                upload_id: upload_id.clone(),
546            };
547
548            let upload_response = client.upload_part_from_buffer(bucket, &object, buf, upload_data).await;
549
550            log::debug!("{:#?}", upload_response);
551
552            assert!(upload_response.is_ok());
553
554            let upload_result = upload_response.unwrap();
555            upload_results.push(((i + 1) as u32, upload_result.etag));
556        }
557
558        let list_parts_response = client.list_parts(bucket, &object, upload_id.clone(), None).await;
559        log::debug!("{:#?}", list_parts_response);
560        assert!(list_parts_response.is_ok());
561        let list_parts_result = list_parts_response.unwrap();
562
563        assert_eq!(ranges.len(), list_parts_result.parts.len());
564
565        let abort_response = client.abort_multipart_uploads(bucket, &object, upload_id.clone()).await;
566        log::debug!("{:#?}", abort_response);
567        assert!(abort_response.is_ok());
568
569        let resp = client.exists(bucket, &object, None).await;
570        assert!(resp.is_ok());
571        assert!(!resp.unwrap());
572    }
573
574    #[tokio::test]
575    async fn test_upload_part_copy_asyn() {
576        setup();
577
578        let client = Client::from_env();
579
580        let bucket = "yuanyq";
581
582        let source_object_key = "rust-sdk-test/img-appended-from-file.jpg";
583        let dest_object_key = format!("rust-sdk-test/img-{}.jpg", Uuid::new_v4());
584
585        let init_response = client.initiate_multipart_uploads(bucket, &dest_object_key, None).await;
586        assert!(init_response.is_ok());
587
588        let upload_id = init_response.unwrap().upload_id.clone();
589
590        // part 1
591        let upload_response = client
592            .upload_part_copy(
593                bucket,
594                &dest_object_key,
595                UploadPartCopyRequest::new(1, &upload_id, source_object_key),
596                Some(UploadPartCopyOptionsBuilder::new().copy_source_range("bytes=0-185000").build()),
597            )
598            .await;
599        assert!(upload_response.is_ok());
600        log::debug!("upload response 1: {:#?}", upload_response);
601
602        let etag1 = upload_response.unwrap().etag;
603
604        let upload_response = client
605            .upload_part_copy(
606                bucket,
607                &dest_object_key,
608                UploadPartCopyRequest::new(2, &upload_id, source_object_key),
609                Some(UploadPartCopyOptionsBuilder::new().copy_source_range("bytes=185001-").build()),
610            )
611            .await;
612        assert!(upload_response.is_ok());
613        log::debug!("upload response 2: {:#?}", upload_response);
614
615        let etag2 = upload_response.unwrap().etag;
616
617        let comp_data = CompleteMultipartUploadRequest {
618            upload_id,
619            parts: vec![(1, etag1), (2, etag2)],
620        };
621
622        let comp_response = client.complete_multipart_uploads(bucket, &dest_object_key, comp_data, None).await;
623        log::debug!("complete multipart upload response: {:#?}", comp_response);
624
625        client.delete_object(bucket, &dest_object_key, None).await.unwrap();
626    }
627
628    #[tokio::test]
629    async fn test_multipart_upload_with_callback() {
630        log::debug!("test multipart upload with callback while completing");
631        setup();
632
633        let callback_url = std::env::var("CALLBACK_TEST_URL").unwrap();
634
635        let client = Client::from_env();
636
637        let bucket = "yuanyq";
638        let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
639        let file = "/home/yuanyq/Downloads/sourcegit_2025.06-1_amd64.deb";
640
641        let meta = std::fs::metadata(file).unwrap();
642
643        let slice_len: u64 = 5 * 1024 * 1024;
644        let mut ranges = vec![];
645        let mut c = 0;
646        loop {
647            let end = (c + 1) * slice_len;
648            let r = Range {
649                start: c * slice_len,
650                end: end.min(meta.len()),
651            };
652
653            ranges.push(r);
654
655            if end >= meta.len() {
656                break;
657            }
658
659            c += 1;
660        }
661
662        log::debug!("{:#?}", ranges);
663
664        let init_response = client.initiate_multipart_uploads(bucket, &object, None).await;
665        assert!(init_response.is_ok());
666
667        let init_result = init_response.unwrap();
668        let upload_id = init_result.upload_id.clone();
669        log::debug!("upload id = {}", upload_id);
670
671        let mut upload_results = vec![];
672
673        for (i, rng) in ranges.iter().enumerate() {
674            let upload_data = UploadPartRequest {
675                part_number: (i + 1) as u32,
676                upload_id: upload_id.clone(),
677            };
678
679            log::debug!("begin to upload part {}", i);
680
681            let upload_response = client.upload_part_from_file(bucket, &object, file, rng.clone(), upload_data).await;
682
683            log::debug!("{:#?}", upload_response);
684
685            assert!(upload_response.is_ok());
686
687            let upload_result = upload_response.unwrap();
688            upload_results.push(((i + 1) as u32, upload_result.etag));
689        }
690
691        log::debug!("all parts uploaded, check it");
692        let resp = client.list_parts(bucket, &object, &upload_id, None).await;
693        assert!(resp.is_ok());
694
695        let ret = resp.unwrap();
696        assert_eq!(ranges.len(), ret.parts.len());
697
698        log::debug!("going to complete multipart upload for upload id: {}", upload_id);
699
700        let cb = CallbackBuilder::new(&callback_url)
701            .body_parameter(CallbackBodyParameter::OssBucket("the_bucket"))
702            .body_parameter(CallbackBodyParameter::OssObject("the_object_key"))
703            .body_parameter(CallbackBodyParameter::OssETag("the_etag"))
704            .body_parameter(CallbackBodyParameter::OssSize("the_size"))
705            .body_parameter(CallbackBodyParameter::OssCrc64("the_crc"))
706            .body_parameter(CallbackBodyParameter::OssClientIp("the_client_ip"))
707            .body_parameter(CallbackBodyParameter::OssContentMd5("the_content_md5"))
708            .body_parameter(CallbackBodyParameter::OssMimeType("the_mime_type"))
709            .body_parameter(CallbackBodyParameter::OssImageWidth("the_image_width"))
710            .body_parameter(CallbackBodyParameter::OssImageHeight("the_image_height"))
711            .body_parameter(CallbackBodyParameter::OssImageFormat("the_image_format"))
712            .body_parameter(CallbackBodyParameter::Custom("my-key", "my-prop", "hello world".to_string()))
713            .body_parameter(CallbackBodyParameter::Constant("my-key-constant", "the-value"))
714            .body_parameter(CallbackBodyParameter::Literal("k1".to_string(), "${x:v1}".to_string()))
715            .custom_variable("v1", "this is value of v1")
716            .build();
717
718        let options = CompleteMultipartUploadOptions { callback: Some(cb) };
719
720        let comp_response = client
721            .complete_multipart_uploads(
722                bucket,
723                &object,
724                CompleteMultipartUploadRequest {
725                    upload_id,
726                    parts: upload_results,
727                },
728                Some(options),
729            )
730            .await;
731
732        log::debug!("{:#?}", comp_response);
733
734        log::debug!("multipart uploads completed");
735
736        let ret = comp_response.unwrap();
737
738        if let CompleteMultipartUploadResult::CallbackResponse(s) = ret {
739            assert!(s.contains(&serde_json::to_string(&object).unwrap()));
740        } else {
741            panic!("no callback json content returned");
742        }
743
744        client.delete_object(bucket, &object, None).await.unwrap();
745    }
746}
747
748#[cfg(test)]
749mod test {
750    use uuid::Uuid;
751
752    #[test]
753    fn test_serde_with_slash() {
754        let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
755        let s = serde_json::to_string(&object).unwrap();
756        println!("{}", s);
757    }
758}