1use std::{ops::Range, path::Path};
4
5use async_trait::async_trait;
6use base64::{prelude::BASE64_STANDARD, Engine};
7
8use crate::{
9 error::Error,
10 multipart_common::{
11 build_complete_multipart_uploads_request, build_initiate_multipart_uploads_request, build_list_multipart_uploads_request, build_list_parts_request,
12 build_upload_part_copy_request, build_upload_part_request, CompleteMultipartUploadApiResponse, CompleteMultipartUploadOptions,
13 CompleteMultipartUploadRequest, CompleteMultipartUploadResult, InitiateMultipartUploadOptions, InitiateMultipartUploadResult,
14 ListMultipartUploadsOptions, ListMultipartUploadsResult, ListPartsOptions, ListPartsResult, UploadPartCopyOptions, UploadPartCopyRequest,
15 UploadPartCopyResult, UploadPartRequest, UploadPartResult,
16 },
17 request::{OssRequest, RequestMethod},
18 util::{validate_bucket_name, validate_object_key},
19 Client, RequestBody, Result,
20};
21
22#[async_trait]
23pub trait MultipartUploadsOperations {
24 async fn list_multipart_uploads<S>(&self, bucket_name: S, options: Option<ListMultipartUploadsOptions>) -> Result<ListMultipartUploadsResult>
28 where
29 S: AsRef<str> + Send;
30
31 async fn list_parts<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3, options: Option<ListPartsOptions>) -> Result<ListPartsResult>
35 where
36 S1: AsRef<str> + Send,
37 S2: AsRef<str> + Send,
38 S3: AsRef<str> + Send;
39
40 async fn initiate_multipart_uploads<S1, S2>(
44 &self,
45 bucket_name: S1,
46 object_key: S2,
47 options: Option<InitiateMultipartUploadOptions>,
48 ) -> Result<InitiateMultipartUploadResult>
49 where
50 S1: AsRef<str> + Send,
51 S2: AsRef<str> + Send;
52
53 async fn upload_part_from_file<S1, S2, P>(
57 &self,
58 bucket_name: S1,
59 object_key: S2,
60 file_path: P,
61 range: Range<u64>,
62 params: UploadPartRequest,
63 ) -> Result<UploadPartResult>
64 where
65 S1: AsRef<str> + Send,
66 S2: AsRef<str> + Send,
67 P: AsRef<Path> + Send;
68
69 async fn upload_part_from_buffer<S1, S2, B>(&self, bucket_name: S1, object_key: S2, buffer: B, params: UploadPartRequest) -> Result<UploadPartResult>
73 where
74 S1: AsRef<str> + Send,
75 S2: AsRef<str> + Send,
76 B: Into<Vec<u8>> + Send;
77
78 async fn upload_part_from_base64<S1, S2, S3>(
82 &self,
83 bucket_name: S1,
84 object_key: S2,
85 base64_string: S3,
86 params: UploadPartRequest,
87 ) -> Result<UploadPartResult>
88 where
89 S1: AsRef<str> + Send,
90 S2: AsRef<str> + Send,
91 S3: AsRef<str> + Send;
92
93 async fn upload_part_copy<S1, S2>(
99 &self,
100 bucket_name: S1,
101 dest_object_key: S2,
102 data: UploadPartCopyRequest,
103 options: Option<UploadPartCopyOptions>,
104 ) -> Result<UploadPartCopyResult>
105 where
106 S1: AsRef<str> + Send,
107 S2: AsRef<str> + Send;
108
109 async fn complete_multipart_uploads<S1, S2>(
113 &self,
114 bucket_name: S1,
115 object_key: S2,
116 data: CompleteMultipartUploadRequest,
117 options: Option<CompleteMultipartUploadOptions>,
118 ) -> Result<CompleteMultipartUploadResult>
119 where
120 S1: AsRef<str> + Send,
121 S2: AsRef<str> + Send;
122
123 async fn abort_multipart_uploads<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3) -> Result<()>
127 where
128 S1: AsRef<str> + Send,
129 S2: AsRef<str> + Send,
130 S3: AsRef<str> + Send;
131}
132
133#[async_trait]
134impl MultipartUploadsOperations for Client {
135 async fn list_multipart_uploads<S>(&self, bucket_name: S, options: Option<ListMultipartUploadsOptions>) -> Result<ListMultipartUploadsResult>
139 where
140 S: AsRef<str> + Send,
141 {
142 if !validate_bucket_name(bucket_name.as_ref()) {
143 return Err(Error::Other(format!("invalid bucket name: {}", bucket_name.as_ref())));
144 }
145 let request = build_list_multipart_uploads_request(bucket_name.as_ref(), &options)?;
146 let (_, xml) = self.do_request::<String>(request).await?;
147
148 ListMultipartUploadsResult::from_xml(&xml)
149 }
150
151 async fn list_parts<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3, options: Option<ListPartsOptions>) -> Result<ListPartsResult>
155 where
156 S1: AsRef<str> + Send,
157 S2: AsRef<str> + Send,
158 S3: AsRef<str> + Send,
159 {
160 let request = build_list_parts_request(bucket_name.as_ref(), object_key.as_ref(), upload_id.as_ref(), &options)?;
161 let (_, xml) = self.do_request::<String>(request).await?;
162 ListPartsResult::from_xml(&xml)
163 }
164
165 async fn initiate_multipart_uploads<S1, S2>(
169 &self,
170 bucket_name: S1,
171 object_key: S2,
172 options: Option<InitiateMultipartUploadOptions>,
173 ) -> Result<InitiateMultipartUploadResult>
174 where
175 S1: AsRef<str> + Send,
176 S2: AsRef<str> + Send,
177 {
178 let request = build_initiate_multipart_uploads_request(bucket_name.as_ref(), object_key.as_ref(), &options)?;
179 let (_, xml) = self.do_request::<String>(request).await?;
180 InitiateMultipartUploadResult::from_xml(&xml)
181 }
182
183 async fn upload_part_from_file<S1, S2, P>(
187 &self,
188 bucket_name: S1,
189 object_key: S2,
190 file_path: P,
191 range: Range<u64>,
192 params: UploadPartRequest,
193 ) -> Result<UploadPartResult>
194 where
195 S1: AsRef<str> + Send,
196 S2: AsRef<str> + Send,
197 P: AsRef<Path> + Send,
198 {
199 let request = build_upload_part_request(
200 bucket_name.as_ref(),
201 object_key.as_ref(),
202 RequestBody::File(file_path.as_ref().to_path_buf(), Some(range)),
203 params,
204 )?;
205
206 let (headers, _) = self.do_request::<()>(request).await?;
207
208 Ok(headers.into())
209 }
210
211 async fn upload_part_from_buffer<S1, S2, B>(&self, bucket_name: S1, object_key: S2, buffer: B, params: UploadPartRequest) -> Result<UploadPartResult>
215 where
216 S1: AsRef<str> + Send,
217 S2: AsRef<str> + Send,
218 B: Into<Vec<u8>> + Send,
219 {
220 let request = build_upload_part_request(bucket_name.as_ref(), object_key.as_ref(), RequestBody::Bytes(buffer.into()), params)?;
221
222 let (headers, _) = self.do_request::<()>(request).await?;
223
224 Ok(headers.into())
225 }
226
227 async fn upload_part_from_base64<S1, S2, S3>(
231 &self,
232 bucket_name: S1,
233 object_key: S2,
234 base64_string: S3,
235 params: UploadPartRequest,
236 ) -> Result<UploadPartResult>
237 where
238 S1: AsRef<str> + Send,
239 S2: AsRef<str> + Send,
240 S3: AsRef<str> + Send,
241 {
242 let data = BASE64_STANDARD.decode(base64_string.as_ref())?;
243 self.upload_part_from_buffer(bucket_name, object_key, data, params).await
244 }
245
246 async fn upload_part_copy<S1, S2>(
252 &self,
253 bucket_name: S1,
254 dest_object_key: S2,
255 data: UploadPartCopyRequest,
256 options: Option<UploadPartCopyOptions>,
257 ) -> Result<UploadPartCopyResult>
258 where
259 S1: AsRef<str> + Send,
260 S2: AsRef<str> + Send,
261 {
262 let bucket_name = bucket_name.as_ref();
263 let object_key = dest_object_key.as_ref();
264 let requet = build_upload_part_copy_request(bucket_name, object_key, data, &options)?;
265 let (_, xml) = self.do_request::<String>(requet).await?;
266 UploadPartCopyResult::from_xml(&xml)
267 }
268
269 async fn complete_multipart_uploads<S1, S2>(
273 &self,
274 bucket_name: S1,
275 object_key: S2,
276 data: CompleteMultipartUploadRequest,
277 options: Option<CompleteMultipartUploadOptions>,
278 ) -> Result<CompleteMultipartUploadResult>
279 where
280 S1: AsRef<str> + Send,
281 S2: AsRef<str> + Send,
282 {
283 let with_callback = if let Some(opt) = &options { opt.callback.is_some() } else { false };
284
285 let request = build_complete_multipart_uploads_request(bucket_name.as_ref(), object_key.as_ref(), data, &options)?;
286 let (_, content) = self.do_request::<String>(request).await?;
287
288 if with_callback {
289 Ok(CompleteMultipartUploadResult::CallbackResponse(content))
290 } else {
291 Ok(CompleteMultipartUploadResult::ApiResponse(CompleteMultipartUploadApiResponse::from_xml(
292 &content,
293 )?))
294 }
295 }
296
297 async fn abort_multipart_uploads<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3) -> Result<()>
301 where
302 S1: AsRef<str> + Send,
303 S2: AsRef<str> + Send,
304 S3: AsRef<str> + Send,
305 {
306 let bucket_name = bucket_name.as_ref();
307 let object_key = object_key.as_ref();
308
309 if !validate_bucket_name(bucket_name) {
310 return Err(Error::Other(format!("invalid bucket name: {}", bucket_name)));
311 }
312
313 if !validate_object_key(object_key) {
314 return Err(Error::Other(format!("invalid object key: {}", object_key)));
315 }
316
317 if upload_id.as_ref().is_empty() {
318 return Err(Error::Other("invalid upload id: [empty]".to_string()));
319 }
320
321 let request = OssRequest::new()
322 .method(RequestMethod::Delete)
323 .bucket(bucket_name)
324 .object(object_key)
325 .add_query("uploadId", upload_id);
326
327 self.do_request::<()>(request).await?;
328
329 Ok(())
330 }
331}
332
333#[cfg(test)]
334mod test_multipart_async {
335 use std::{
336 io::{Read, Seek},
337 ops::Range,
338 sync::Once,
339 };
340
341 use uuid::Uuid;
342
343 use crate::{
344 Client, multipart::MultipartUploadsOperations, multipart_common::{
345 CompleteMultipartUploadOptions, CompleteMultipartUploadRequest, CompleteMultipartUploadResult, InitiateMultipartUploadOptionsBuilder, UploadPartCopyOptionsBuilder, UploadPartCopyRequest, UploadPartRequest
346 }, object::ObjectOperations, object_common::{CallbackBodyParameter, CallbackBuilder}
347 };
348
349 static INIT: Once = Once::new();
350
351 fn setup() {
352 INIT.call_once(|| {
353 simple_logger::init_with_level(log::Level::Debug).unwrap();
354 dotenvy::dotenv().unwrap();
355 });
356 }
357
358 #[allow(dead_code)]
359 fn setup_comp() {
360 INIT.call_once(|| {
361 simple_logger::init_with_level(log::Level::Debug).unwrap();
362 dotenvy::from_filename(".env.comp").unwrap();
363 });
364 }
365
366 #[tokio::test]
401 async fn test_multipart_uploads_from_file_async() {
402 setup_comp();
403
404 let client = Client::from_env();
405
406 let bucket = "mi-dev-private";
407 let object = format!("yuanyu-test/rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
408 let file = "/home/yuanyq/Downloads/apifox_2.8.11_amd64.deb";
409
410 let meta = std::fs::metadata(file).unwrap();
411
412 let slice_len: u64 = 50 * 1024 * 1024;
413 let mut ranges = vec![];
414 let mut c = 0;
415 loop {
416 let end = (c + 1) * slice_len;
417 let r = Range {
418 start: c * slice_len,
419 end: end.min(meta.len()),
420 };
421
422 ranges.push(r);
423
424 if end >= meta.len() {
425 break;
426 }
427
428 c += 1;
429 }
430
431 log::debug!("{:#?}", ranges);
432
433 let opt = InitiateMultipartUploadOptionsBuilder::new().parameter("sequential", "").build();
434
435 let init_response = client.initiate_multipart_uploads(bucket, &object, Some(opt)).await;
436 assert!(init_response.is_ok());
437
438 let init_result = init_response.unwrap();
439 let upload_id = init_result.upload_id.clone();
440 log::debug!("upload id = {}", upload_id);
441
442 let mut upload_results = vec![];
443
444 for (i, rng) in ranges.iter().enumerate() {
445 let upload_data = UploadPartRequest {
446 part_number: (i + 1) as u32,
447 upload_id: upload_id.clone(),
448 };
449
450 log::debug!("begin to upload part {}", i);
451
452 let upload_response = client.upload_part_from_file(bucket, &object, file, rng.clone(), upload_data).await;
453
454 log::debug!("{:#?}", upload_response);
455
456 assert!(upload_response.is_ok());
457
458 let upload_result = upload_response.unwrap();
459 upload_results.push(((i + 1) as u32, upload_result.etag));
460 }
461
462 log::debug!("all parts uploaded, check it");
463 let resp = client.list_parts(bucket, &object, &upload_id, None).await;
464 assert!(resp.is_ok());
465
466 let ret = resp.unwrap();
467 assert_eq!(ranges.len(), ret.parts.len());
468
469 log::debug!("going to complete multipart upload for upload id: {}", upload_id);
470
471 let comp_response = client
472 .complete_multipart_uploads(
473 bucket,
474 &object,
475 CompleteMultipartUploadRequest {
476 upload_id,
477 parts: upload_results,
478 },
479 None,
480 )
481 .await;
482
483 log::debug!("{:#?}", comp_response);
484
485 log::debug!("multipart uploads completed");
486
487 client.delete_object(bucket, &object, None).await.unwrap();
488 }
489
490 #[tokio::test]
491 async fn test_upload_part_from_buffer_async() {
492 setup();
493
494 let client = Client::from_env();
495
496 let bucket = "yuanyq";
497 let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
498 let file = "/home/yuanyq/Downloads/sourcegit_2025.06-1_amd64.deb";
499
500 let meta = std::fs::metadata(file).unwrap();
501
502 let slice_len: u64 = 10 * 1024 * 1024;
503 let mut ranges = vec![];
504 let mut c = 0;
505 loop {
506 let end = (c + 1) * slice_len;
507 let r = Range {
508 start: c * slice_len,
509 end: end.min(meta.len()),
510 };
511
512 ranges.push(r);
513
514 if end >= meta.len() {
515 break;
516 }
517
518 c += 1;
519 }
520
521 log::debug!("{:#?}", ranges);
522
523 let init_response = client.initiate_multipart_uploads(bucket, &object, None).await;
524 assert!(init_response.is_ok());
525
526 let init_result = init_response.unwrap();
527 let upload_id = init_result.upload_id.clone();
528 log::debug!("upload id = {}", upload_id);
529
530 let mut upload_results = vec![];
531
532 for (i, rng) in ranges.iter().enumerate() {
533 let part_no = (i + 1) as u32;
534
535 log::debug!("begin to upload part {}", i);
536
537 let mut buf = Vec::new();
538 let mut stream = std::fs::File::open(file).unwrap();
539 stream.seek(std::io::SeekFrom::Start(rng.start)).unwrap();
540 let mut partial = stream.take(rng.end - rng.start);
541 partial.read_to_end(&mut buf).unwrap();
542
543 let upload_data = UploadPartRequest {
544 part_number: part_no,
545 upload_id: upload_id.clone(),
546 };
547
548 let upload_response = client.upload_part_from_buffer(bucket, &object, buf, upload_data).await;
549
550 log::debug!("{:#?}", upload_response);
551
552 assert!(upload_response.is_ok());
553
554 let upload_result = upload_response.unwrap();
555 upload_results.push(((i + 1) as u32, upload_result.etag));
556 }
557
558 let list_parts_response = client.list_parts(bucket, &object, upload_id.clone(), None).await;
559 log::debug!("{:#?}", list_parts_response);
560 assert!(list_parts_response.is_ok());
561 let list_parts_result = list_parts_response.unwrap();
562
563 assert_eq!(ranges.len(), list_parts_result.parts.len());
564
565 let abort_response = client.abort_multipart_uploads(bucket, &object, upload_id.clone()).await;
566 log::debug!("{:#?}", abort_response);
567 assert!(abort_response.is_ok());
568
569 let resp = client.exists(bucket, &object, None).await;
570 assert!(resp.is_ok());
571 assert!(!resp.unwrap());
572 }
573
574 #[tokio::test]
575 async fn test_upload_part_copy_asyn() {
576 setup();
577
578 let client = Client::from_env();
579
580 let bucket = "yuanyq";
581
582 let source_object_key = "rust-sdk-test/img-appended-from-file.jpg";
583 let dest_object_key = format!("rust-sdk-test/img-{}.jpg", Uuid::new_v4());
584
585 let init_response = client.initiate_multipart_uploads(bucket, &dest_object_key, None).await;
586 assert!(init_response.is_ok());
587
588 let upload_id = init_response.unwrap().upload_id.clone();
589
590 let upload_response = client
592 .upload_part_copy(
593 bucket,
594 &dest_object_key,
595 UploadPartCopyRequest::new(1, &upload_id, source_object_key),
596 Some(UploadPartCopyOptionsBuilder::new().copy_source_range("bytes=0-185000").build()),
597 )
598 .await;
599 assert!(upload_response.is_ok());
600 log::debug!("upload response 1: {:#?}", upload_response);
601
602 let etag1 = upload_response.unwrap().etag;
603
604 let upload_response = client
605 .upload_part_copy(
606 bucket,
607 &dest_object_key,
608 UploadPartCopyRequest::new(2, &upload_id, source_object_key),
609 Some(UploadPartCopyOptionsBuilder::new().copy_source_range("bytes=185001-").build()),
610 )
611 .await;
612 assert!(upload_response.is_ok());
613 log::debug!("upload response 2: {:#?}", upload_response);
614
615 let etag2 = upload_response.unwrap().etag;
616
617 let comp_data = CompleteMultipartUploadRequest {
618 upload_id,
619 parts: vec![(1, etag1), (2, etag2)],
620 };
621
622 let comp_response = client.complete_multipart_uploads(bucket, &dest_object_key, comp_data, None).await;
623 log::debug!("complete multipart upload response: {:#?}", comp_response);
624
625 client.delete_object(bucket, &dest_object_key, None).await.unwrap();
626 }
627
628 #[tokio::test]
629 async fn test_multipart_upload_with_callback() {
630 log::debug!("test multipart upload with callback while completing");
631 setup();
632
633 let callback_url = std::env::var("CALLBACK_TEST_URL").unwrap();
634
635 let client = Client::from_env();
636
637 let bucket = "yuanyq";
638 let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
639 let file = "/home/yuanyq/Downloads/sourcegit_2025.06-1_amd64.deb";
640
641 let meta = std::fs::metadata(file).unwrap();
642
643 let slice_len: u64 = 5 * 1024 * 1024;
644 let mut ranges = vec![];
645 let mut c = 0;
646 loop {
647 let end = (c + 1) * slice_len;
648 let r = Range {
649 start: c * slice_len,
650 end: end.min(meta.len()),
651 };
652
653 ranges.push(r);
654
655 if end >= meta.len() {
656 break;
657 }
658
659 c += 1;
660 }
661
662 log::debug!("{:#?}", ranges);
663
664 let init_response = client.initiate_multipart_uploads(bucket, &object, None).await;
665 assert!(init_response.is_ok());
666
667 let init_result = init_response.unwrap();
668 let upload_id = init_result.upload_id.clone();
669 log::debug!("upload id = {}", upload_id);
670
671 let mut upload_results = vec![];
672
673 for (i, rng) in ranges.iter().enumerate() {
674 let upload_data = UploadPartRequest {
675 part_number: (i + 1) as u32,
676 upload_id: upload_id.clone(),
677 };
678
679 log::debug!("begin to upload part {}", i);
680
681 let upload_response = client.upload_part_from_file(bucket, &object, file, rng.clone(), upload_data).await;
682
683 log::debug!("{:#?}", upload_response);
684
685 assert!(upload_response.is_ok());
686
687 let upload_result = upload_response.unwrap();
688 upload_results.push(((i + 1) as u32, upload_result.etag));
689 }
690
691 log::debug!("all parts uploaded, check it");
692 let resp = client.list_parts(bucket, &object, &upload_id, None).await;
693 assert!(resp.is_ok());
694
695 let ret = resp.unwrap();
696 assert_eq!(ranges.len(), ret.parts.len());
697
698 log::debug!("going to complete multipart upload for upload id: {}", upload_id);
699
700 let cb = CallbackBuilder::new(&callback_url)
701 .body_parameter(CallbackBodyParameter::OssBucket("the_bucket"))
702 .body_parameter(CallbackBodyParameter::OssObject("the_object_key"))
703 .body_parameter(CallbackBodyParameter::OssETag("the_etag"))
704 .body_parameter(CallbackBodyParameter::OssSize("the_size"))
705 .body_parameter(CallbackBodyParameter::OssCrc64("the_crc"))
706 .body_parameter(CallbackBodyParameter::OssClientIp("the_client_ip"))
707 .body_parameter(CallbackBodyParameter::OssContentMd5("the_content_md5"))
708 .body_parameter(CallbackBodyParameter::OssMimeType("the_mime_type"))
709 .body_parameter(CallbackBodyParameter::OssImageWidth("the_image_width"))
710 .body_parameter(CallbackBodyParameter::OssImageHeight("the_image_height"))
711 .body_parameter(CallbackBodyParameter::OssImageFormat("the_image_format"))
712 .body_parameter(CallbackBodyParameter::Custom("my-key", "my-prop", "hello world".to_string()))
713 .body_parameter(CallbackBodyParameter::Constant("my-key-constant", "the-value"))
714 .body_parameter(CallbackBodyParameter::Literal("k1".to_string(), "${x:v1}".to_string()))
715 .custom_variable("v1", "this is value of v1")
716 .build();
717
718 let options = CompleteMultipartUploadOptions { callback: Some(cb) };
719
720 let comp_response = client
721 .complete_multipart_uploads(
722 bucket,
723 &object,
724 CompleteMultipartUploadRequest {
725 upload_id,
726 parts: upload_results,
727 },
728 Some(options),
729 )
730 .await;
731
732 log::debug!("{:#?}", comp_response);
733
734 log::debug!("multipart uploads completed");
735
736 let ret = comp_response.unwrap();
737
738 if let CompleteMultipartUploadResult::CallbackResponse(s) = ret {
739 assert!(s.contains(&serde_json::to_string(&object).unwrap()));
740 } else {
741 panic!("no callback json content returned");
742 }
743
744 client.delete_object(bucket, &object, None).await.unwrap();
745 }
746}
747
748#[cfg(test)]
749mod test {
750 use uuid::Uuid;
751
752 #[test]
753 fn test_serde_with_slash() {
754 let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
755 let s = serde_json::to_string(&object).unwrap();
756 println!("{}", s);
757 }
758}