1use std::{ops::Range, path::Path};
4
5use async_trait::async_trait;
6use base64::{prelude::BASE64_STANDARD, Engine};
7
8use crate::{
9 error::Error,
10 multipart_common::{
11 build_complete_multipart_uploads_request, build_initiate_multipart_uploads_request, build_list_multipart_uploads_request, build_list_parts_request,
12 build_upload_part_copy_request, build_upload_part_request, CompleteMultipartUploadApiResponse, CompleteMultipartUploadOptions,
13 CompleteMultipartUploadRequest, CompleteMultipartUploadResult, InitiateMultipartUploadOptions, InitiateMultipartUploadResult,
14 ListMultipartUploadsOptions, ListMultipartUploadsResult, ListPartsOptions, ListPartsResult, UploadPartCopyOptions, UploadPartCopyRequest,
15 UploadPartCopyResult, UploadPartRequest, UploadPartResult,
16 },
17 request::{OssRequest, RequestMethod},
18 util::{validate_bucket_name, validate_object_key},
19 Client, RequestBody, Result,
20};
21
22#[async_trait]
23pub trait MultipartUploadsOperations {
24 async fn list_multipart_uploads<S>(&self, bucket_name: S, options: Option<ListMultipartUploadsOptions>) -> Result<ListMultipartUploadsResult>
28 where
29 S: AsRef<str> + Send;
30
31 async fn list_parts<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3, options: Option<ListPartsOptions>) -> Result<ListPartsResult>
35 where
36 S1: AsRef<str> + Send,
37 S2: AsRef<str> + Send,
38 S3: AsRef<str> + Send;
39
40 async fn initiate_multipart_uploads<S1, S2>(
44 &self,
45 bucket_name: S1,
46 object_key: S2,
47 options: Option<InitiateMultipartUploadOptions>,
48 ) -> Result<InitiateMultipartUploadResult>
49 where
50 S1: AsRef<str> + Send,
51 S2: AsRef<str> + Send;
52
53 async fn upload_part_from_file<S1, S2, P>(
57 &self,
58 bucket_name: S1,
59 object_key: S2,
60 file_path: P,
61 range: Range<u64>,
62 params: UploadPartRequest,
63 ) -> Result<UploadPartResult>
64 where
65 S1: AsRef<str> + Send,
66 S2: AsRef<str> + Send,
67 P: AsRef<Path> + Send;
68
69 async fn upload_part_from_buffer<S1, S2, B>(&self, bucket_name: S1, object_key: S2, buffer: B, params: UploadPartRequest) -> Result<UploadPartResult>
73 where
74 S1: AsRef<str> + Send,
75 S2: AsRef<str> + Send,
76 B: Into<Vec<u8>> + Send;
77
78 async fn upload_part_from_base64<S1, S2, S3>(
82 &self,
83 bucket_name: S1,
84 object_key: S2,
85 base64_string: S3,
86 params: UploadPartRequest,
87 ) -> Result<UploadPartResult>
88 where
89 S1: AsRef<str> + Send,
90 S2: AsRef<str> + Send,
91 S3: AsRef<str> + Send;
92
93 async fn upload_part_copy<S1, S2>(
99 &self,
100 bucket_name: S1,
101 dest_object_key: S2,
102 data: UploadPartCopyRequest,
103 options: Option<UploadPartCopyOptions>,
104 ) -> Result<UploadPartCopyResult>
105 where
106 S1: AsRef<str> + Send,
107 S2: AsRef<str> + Send;
108
109 async fn complete_multipart_uploads<S1, S2>(
113 &self,
114 bucket_name: S1,
115 object_key: S2,
116 data: CompleteMultipartUploadRequest,
117 options: Option<CompleteMultipartUploadOptions>,
118 ) -> Result<CompleteMultipartUploadResult>
119 where
120 S1: AsRef<str> + Send,
121 S2: AsRef<str> + Send;
122
123 async fn abort_multipart_uploads<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3) -> Result<()>
127 where
128 S1: AsRef<str> + Send,
129 S2: AsRef<str> + Send,
130 S3: AsRef<str> + Send;
131}
132
133#[async_trait]
134impl MultipartUploadsOperations for Client {
135 async fn list_multipart_uploads<S>(&self, bucket_name: S, options: Option<ListMultipartUploadsOptions>) -> Result<ListMultipartUploadsResult>
139 where
140 S: AsRef<str> + Send,
141 {
142 if !validate_bucket_name(bucket_name.as_ref()) {
143 return Err(Error::Other(format!("invalid bucket name: {}", bucket_name.as_ref())));
144 }
145 let request = build_list_multipart_uploads_request(bucket_name.as_ref(), &options)?;
146 let (_, xml) = self.do_request::<String>(request).await?;
147
148 ListMultipartUploadsResult::from_xml(&xml)
149 }
150
151 async fn list_parts<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3, options: Option<ListPartsOptions>) -> Result<ListPartsResult>
155 where
156 S1: AsRef<str> + Send,
157 S2: AsRef<str> + Send,
158 S3: AsRef<str> + Send,
159 {
160 let request = build_list_parts_request(bucket_name.as_ref(), object_key.as_ref(), upload_id.as_ref(), &options)?;
161 let (_, xml) = self.do_request::<String>(request).await?;
162 ListPartsResult::from_xml(&xml)
163 }
164
165 async fn initiate_multipart_uploads<S1, S2>(
169 &self,
170 bucket_name: S1,
171 object_key: S2,
172 options: Option<InitiateMultipartUploadOptions>,
173 ) -> Result<InitiateMultipartUploadResult>
174 where
175 S1: AsRef<str> + Send,
176 S2: AsRef<str> + Send,
177 {
178 let request = build_initiate_multipart_uploads_request(bucket_name.as_ref(), object_key.as_ref(), &options)?;
179 let (_, xml) = self.do_request::<String>(request).await?;
180 InitiateMultipartUploadResult::from_xml(&xml)
181 }
182
183 async fn upload_part_from_file<S1, S2, P>(
187 &self,
188 bucket_name: S1,
189 object_key: S2,
190 file_path: P,
191 range: Range<u64>,
192 params: UploadPartRequest,
193 ) -> Result<UploadPartResult>
194 where
195 S1: AsRef<str> + Send,
196 S2: AsRef<str> + Send,
197 P: AsRef<Path> + Send,
198 {
199 let request = build_upload_part_request(
200 bucket_name.as_ref(),
201 object_key.as_ref(),
202 RequestBody::File(file_path.as_ref().to_path_buf(), Some(range)),
203 params,
204 )?;
205
206 let (headers, _) = self.do_request::<()>(request).await?;
207
208 Ok(headers.into())
209 }
210
211 async fn upload_part_from_buffer<S1, S2, B>(&self, bucket_name: S1, object_key: S2, buffer: B, params: UploadPartRequest) -> Result<UploadPartResult>
215 where
216 S1: AsRef<str> + Send,
217 S2: AsRef<str> + Send,
218 B: Into<Vec<u8>> + Send,
219 {
220 let request = build_upload_part_request(bucket_name.as_ref(), object_key.as_ref(), RequestBody::Bytes(buffer.into()), params)?;
221
222 let (headers, _) = self.do_request::<()>(request).await?;
223
224 Ok(headers.into())
225 }
226
227 async fn upload_part_from_base64<S1, S2, S3>(
231 &self,
232 bucket_name: S1,
233 object_key: S2,
234 base64_string: S3,
235 params: UploadPartRequest,
236 ) -> Result<UploadPartResult>
237 where
238 S1: AsRef<str> + Send,
239 S2: AsRef<str> + Send,
240 S3: AsRef<str> + Send,
241 {
242 let data = BASE64_STANDARD.decode(base64_string.as_ref())?;
243 self.upload_part_from_buffer(bucket_name, object_key, data, params).await
244 }
245
246 async fn upload_part_copy<S1, S2>(
252 &self,
253 bucket_name: S1,
254 dest_object_key: S2,
255 data: UploadPartCopyRequest,
256 options: Option<UploadPartCopyOptions>,
257 ) -> Result<UploadPartCopyResult>
258 where
259 S1: AsRef<str> + Send,
260 S2: AsRef<str> + Send,
261 {
262 let bucket_name = bucket_name.as_ref();
263 let object_key = dest_object_key.as_ref();
264 let requet = build_upload_part_copy_request(bucket_name, object_key, data, &options)?;
265 let (_, xml) = self.do_request::<String>(requet).await?;
266 UploadPartCopyResult::from_xml(&xml)
267 }
268
269 async fn complete_multipart_uploads<S1, S2>(
273 &self,
274 bucket_name: S1,
275 object_key: S2,
276 data: CompleteMultipartUploadRequest,
277 options: Option<CompleteMultipartUploadOptions>,
278 ) -> Result<CompleteMultipartUploadResult>
279 where
280 S1: AsRef<str> + Send,
281 S2: AsRef<str> + Send,
282 {
283 let with_callback = if let Some(opt) = &options { opt.callback.is_some() } else { false };
284
285 let request = build_complete_multipart_uploads_request(bucket_name.as_ref(), object_key.as_ref(), data, &options)?;
286 let (_, content) = self.do_request::<String>(request).await?;
287
288 if with_callback {
289 Ok(CompleteMultipartUploadResult::CallbackResponse(content))
290 } else {
291 Ok(CompleteMultipartUploadResult::ApiResponse(CompleteMultipartUploadApiResponse::from_xml(
292 &content,
293 )?))
294 }
295 }
296
297 async fn abort_multipart_uploads<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3) -> Result<()>
301 where
302 S1: AsRef<str> + Send,
303 S2: AsRef<str> + Send,
304 S3: AsRef<str> + Send,
305 {
306 let bucket_name = bucket_name.as_ref();
307 let object_key = object_key.as_ref();
308
309 if !validate_bucket_name(bucket_name) {
310 return Err(Error::Other(format!("invalid bucket name: {}", bucket_name)));
311 }
312
313 if !validate_object_key(object_key) {
314 return Err(Error::Other(format!("invalid object key: {}", object_key)));
315 }
316
317 if upload_id.as_ref().is_empty() {
318 return Err(Error::Other("invalid upload id: [empty]".to_string()));
319 }
320
321 let request = OssRequest::new()
322 .method(RequestMethod::Delete)
323 .bucket(bucket_name)
324 .object(object_key)
325 .add_query("uploadId", upload_id);
326
327 self.do_request::<()>(request).await?;
328
329 Ok(())
330 }
331}
332
333#[cfg(test)]
334mod test_multipart_async {
335 use std::{
336 io::{Read, Seek},
337 ops::Range,
338 sync::Once,
339 };
340
341 use uuid::Uuid;
342
343 use crate::{
344 multipart::MultipartUploadsOperations,
345 multipart_common::{
346 CompleteMultipartUploadOptions, CompleteMultipartUploadRequest, CompleteMultipartUploadResult, UploadPartCopyOptionsBuilder, UploadPartCopyRequest,
347 UploadPartRequest,
348 },
349 object::ObjectOperations,
350 object_common::{CallbackBodyParameter, CallbackBuilder},
351 Client,
352 };
353
354 static INIT: Once = Once::new();
355
356 fn setup() {
357 INIT.call_once(|| {
358 simple_logger::init_with_level(log::Level::Debug).unwrap();
359 dotenvy::dotenv().unwrap();
360 });
361 }
362
363 #[allow(dead_code)]
364 fn setup_comp() {
365 INIT.call_once(|| {
366 simple_logger::init_with_level(log::Level::Debug).unwrap();
367 dotenvy::from_filename(".env.comp").unwrap();
368 });
369 }
370
371 #[tokio::test]
406 async fn test_multipart_uploads_from_file_async() {
407 setup();
408
409 let client = Client::from_env();
410
411 let bucket = "yuanyq";
412 let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
413 let file = "/home/yuanyq/Downloads/sourcegit_2025.06-1_amd64.deb";
414
415 let meta = std::fs::metadata(file).unwrap();
416
417 let slice_len: u64 = 5 * 1024 * 1024;
418 let mut ranges = vec![];
419 let mut c = 0;
420 loop {
421 let end = (c + 1) * slice_len;
422 let r = Range {
423 start: c * slice_len,
424 end: end.min(meta.len()),
425 };
426
427 ranges.push(r);
428
429 if end >= meta.len() {
430 break;
431 }
432
433 c += 1;
434 }
435
436 log::debug!("{:#?}", ranges);
437
438 let init_response = client.initiate_multipart_uploads(bucket, &object, None).await;
439 assert!(init_response.is_ok());
440
441 let init_result = init_response.unwrap();
442 let upload_id = init_result.upload_id.clone();
443 log::debug!("upload id = {}", upload_id);
444
445 let mut upload_results = vec![];
446
447 for (i, rng) in ranges.iter().enumerate() {
448 let upload_data = UploadPartRequest {
449 part_number: (i + 1) as u32,
450 upload_id: upload_id.clone(),
451 };
452
453 log::debug!("begin to upload part {}", i);
454
455 let upload_response = client.upload_part_from_file(bucket, &object, file, rng.clone(), upload_data).await;
456
457 log::debug!("{:#?}", upload_response);
458
459 assert!(upload_response.is_ok());
460
461 let upload_result = upload_response.unwrap();
462 upload_results.push(((i + 1) as u32, upload_result.etag));
463 }
464
465 log::debug!("all parts uploaded, check it");
466 let resp = client.list_parts(bucket, &object, &upload_id, None).await;
467 assert!(resp.is_ok());
468
469 let ret = resp.unwrap();
470 assert_eq!(ranges.len(), ret.parts.len());
471
472 log::debug!("going to complete multipart upload for upload id: {}", upload_id);
473
474 let comp_response = client
475 .complete_multipart_uploads(
476 bucket,
477 &object,
478 CompleteMultipartUploadRequest {
479 upload_id,
480 parts: upload_results,
481 },
482 None,
483 )
484 .await;
485
486 log::debug!("{:#?}", comp_response);
487
488 log::debug!("multipart uploads completed");
489
490 client.delete_object(bucket, &object, None).await.unwrap();
491 }
492
493 #[tokio::test]
494 async fn test_upload_part_from_buffer_async() {
495 setup();
496
497 let client = Client::from_env();
498
499 let bucket = "yuanyq";
500 let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
501 let file = "/home/yuanyq/Downloads/sourcegit_2025.06-1_amd64.deb";
502
503 let meta = std::fs::metadata(file).unwrap();
504
505 let slice_len: u64 = 10 * 1024 * 1024;
506 let mut ranges = vec![];
507 let mut c = 0;
508 loop {
509 let end = (c + 1) * slice_len;
510 let r = Range {
511 start: c * slice_len,
512 end: end.min(meta.len()),
513 };
514
515 ranges.push(r);
516
517 if end >= meta.len() {
518 break;
519 }
520
521 c += 1;
522 }
523
524 log::debug!("{:#?}", ranges);
525
526 let init_response = client.initiate_multipart_uploads(bucket, &object, None).await;
527 assert!(init_response.is_ok());
528
529 let init_result = init_response.unwrap();
530 let upload_id = init_result.upload_id.clone();
531 log::debug!("upload id = {}", upload_id);
532
533 let mut upload_results = vec![];
534
535 for (i, rng) in ranges.iter().enumerate() {
536 let part_no = (i + 1) as u32;
537
538 log::debug!("begin to upload part {}", i);
539
540 let mut buf = Vec::new();
541 let mut stream = std::fs::File::open(file).unwrap();
542 stream.seek(std::io::SeekFrom::Start(rng.start)).unwrap();
543 let mut partial = stream.take(rng.end - rng.start);
544 partial.read_to_end(&mut buf).unwrap();
545
546 let upload_data = UploadPartRequest {
547 part_number: part_no,
548 upload_id: upload_id.clone(),
549 };
550
551 let upload_response = client.upload_part_from_buffer(bucket, &object, buf, upload_data).await;
552
553 log::debug!("{:#?}", upload_response);
554
555 assert!(upload_response.is_ok());
556
557 let upload_result = upload_response.unwrap();
558 upload_results.push(((i + 1) as u32, upload_result.etag));
559 }
560
561 let list_parts_response = client.list_parts(bucket, &object, upload_id.clone(), None).await;
562 log::debug!("{:#?}", list_parts_response);
563 assert!(list_parts_response.is_ok());
564 let list_parts_result = list_parts_response.unwrap();
565
566 assert_eq!(ranges.len(), list_parts_result.parts.len());
567
568 let abort_response = client.abort_multipart_uploads(bucket, &object, upload_id.clone()).await;
569 log::debug!("{:#?}", abort_response);
570 assert!(abort_response.is_ok());
571
572 let resp = client.exists(bucket, &object, None).await;
573 assert!(resp.is_ok());
574 assert!(!resp.unwrap());
575 }
576
577 #[tokio::test]
578 async fn test_upload_part_copy_asyn() {
579 setup();
580
581 let client = Client::from_env();
582
583 let bucket = "yuanyq";
584
585 let source_object_key = "rust-sdk-test/img-appended-from-file.jpg";
586 let dest_object_key = format!("rust-sdk-test/img-{}.jpg", Uuid::new_v4());
587
588 let init_response = client.initiate_multipart_uploads(bucket, &dest_object_key, None).await;
589 assert!(init_response.is_ok());
590
591 let upload_id = init_response.unwrap().upload_id.clone();
592
593 let upload_response = client
595 .upload_part_copy(
596 bucket,
597 &dest_object_key,
598 UploadPartCopyRequest::new(1, &upload_id, source_object_key),
599 Some(UploadPartCopyOptionsBuilder::new().copy_source_range("bytes=0-185000").build()),
600 )
601 .await;
602 assert!(upload_response.is_ok());
603 log::debug!("upload response 1: {:#?}", upload_response);
604
605 let etag1 = upload_response.unwrap().etag;
606
607 let upload_response = client
608 .upload_part_copy(
609 bucket,
610 &dest_object_key,
611 UploadPartCopyRequest::new(2, &upload_id, source_object_key),
612 Some(UploadPartCopyOptionsBuilder::new().copy_source_range("bytes=185001-").build()),
613 )
614 .await;
615 assert!(upload_response.is_ok());
616 log::debug!("upload response 2: {:#?}", upload_response);
617
618 let etag2 = upload_response.unwrap().etag;
619
620 let comp_data = CompleteMultipartUploadRequest {
621 upload_id,
622 parts: vec![(1, etag1), (2, etag2)],
623 };
624
625 let comp_response = client.complete_multipart_uploads(bucket, &dest_object_key, comp_data, None).await;
626 log::debug!("complete multipart upload response: {:#?}", comp_response);
627
628 client.delete_object(bucket, &dest_object_key, None).await.unwrap();
629 }
630
631 #[tokio::test]
632 async fn test_multipart_upload_with_callback() {
633 log::debug!("test multipart upload with callback while completing");
634 setup();
635
636 let callback_url = std::env::var("CALLBACK_TEST_URL").unwrap();
637
638 let client = Client::from_env();
639
640 let bucket = "yuanyq";
641 let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
642 let file = "/home/yuanyq/Downloads/sourcegit_2025.06-1_amd64.deb";
643
644 let meta = std::fs::metadata(file).unwrap();
645
646 let slice_len: u64 = 5 * 1024 * 1024;
647 let mut ranges = vec![];
648 let mut c = 0;
649 loop {
650 let end = (c + 1) * slice_len;
651 let r = Range {
652 start: c * slice_len,
653 end: end.min(meta.len()),
654 };
655
656 ranges.push(r);
657
658 if end >= meta.len() {
659 break;
660 }
661
662 c += 1;
663 }
664
665 log::debug!("{:#?}", ranges);
666
667 let init_response = client.initiate_multipart_uploads(bucket, &object, None).await;
668 assert!(init_response.is_ok());
669
670 let init_result = init_response.unwrap();
671 let upload_id = init_result.upload_id.clone();
672 log::debug!("upload id = {}", upload_id);
673
674 let mut upload_results = vec![];
675
676 for (i, rng) in ranges.iter().enumerate() {
677 let upload_data = UploadPartRequest {
678 part_number: (i + 1) as u32,
679 upload_id: upload_id.clone(),
680 };
681
682 log::debug!("begin to upload part {}", i);
683
684 let upload_response = client.upload_part_from_file(bucket, &object, file, rng.clone(), upload_data).await;
685
686 log::debug!("{:#?}", upload_response);
687
688 assert!(upload_response.is_ok());
689
690 let upload_result = upload_response.unwrap();
691 upload_results.push(((i + 1) as u32, upload_result.etag));
692 }
693
694 log::debug!("all parts uploaded, check it");
695 let resp = client.list_parts(bucket, &object, &upload_id, None).await;
696 assert!(resp.is_ok());
697
698 let ret = resp.unwrap();
699 assert_eq!(ranges.len(), ret.parts.len());
700
701 log::debug!("going to complete multipart upload for upload id: {}", upload_id);
702
703 let cb = CallbackBuilder::new(&callback_url)
704 .body_parameter(CallbackBodyParameter::OssBucket("the_bucket"))
705 .body_parameter(CallbackBodyParameter::OssObject("the_object_key"))
706 .body_parameter(CallbackBodyParameter::OssETag("the_etag"))
707 .body_parameter(CallbackBodyParameter::OssSize("the_size"))
708 .body_parameter(CallbackBodyParameter::OssCrc64("the_crc"))
709 .body_parameter(CallbackBodyParameter::OssClientIp("the_client_ip"))
710 .body_parameter(CallbackBodyParameter::OssContentMd5("the_content_md5"))
711 .body_parameter(CallbackBodyParameter::OssMimeType("the_mime_type"))
712 .body_parameter(CallbackBodyParameter::OssImageWidth("the_image_width"))
713 .body_parameter(CallbackBodyParameter::OssImageHeight("the_image_height"))
714 .body_parameter(CallbackBodyParameter::OssImageFormat("the_image_format"))
715 .body_parameter(CallbackBodyParameter::Custom("my-key", "my-prop", "hello world".to_string()))
716 .body_parameter(CallbackBodyParameter::Constant("my-key-constant", "the-value"))
717 .body_parameter(CallbackBodyParameter::Literal("k1".to_string(), "${x:v1}".to_string()))
718 .custom_variable("v1", "this is value of v1")
719 .build();
720
721 let options = CompleteMultipartUploadOptions { callback: Some(cb) };
722
723 let comp_response = client
724 .complete_multipart_uploads(
725 bucket,
726 &object,
727 CompleteMultipartUploadRequest {
728 upload_id,
729 parts: upload_results,
730 },
731 Some(options),
732 )
733 .await;
734
735 log::debug!("{:#?}", comp_response);
736
737 log::debug!("multipart uploads completed");
738
739 let ret = comp_response.unwrap();
740
741 if let CompleteMultipartUploadResult::CallbackResponse(s) = ret {
742 assert!(s.contains(&serde_json::to_string(&object).unwrap()));
743 } else {
744 panic!("no callback json content returned");
745 }
746
747 client.delete_object(bucket, &object, None).await.unwrap();
748 }
749}
750
751#[cfg(test)]
752mod test {
753 use uuid::Uuid;
754
755 #[test]
756 fn test_serde_with_slash() {
757 let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
758 let s = serde_json::to_string(&object).unwrap();
759 println!("{}", s);
760 }
761}