1use std::ops::Range;
2use std::path::Path;
3
4use base64::prelude::BASE64_STANDARD;
5use base64::Engine;
6
7use crate::error::Error;
8use crate::multipart_common::{
9 build_complete_multipart_uploads_request, build_initiate_multipart_uploads_request, build_list_multipart_uploads_request, build_list_parts_request,
10 build_upload_part_copy_request, build_upload_part_request, CompleteMultipartUploadApiResponse, CompleteMultipartUploadOptions,
11 CompleteMultipartUploadRequest, CompleteMultipartUploadResult, InitiateMultipartUploadOptions, InitiateMultipartUploadResult, ListMultipartUploadsOptions,
12 ListMultipartUploadsResult, ListPartsOptions, ListPartsResult, UploadPartCopyOptions, UploadPartCopyRequest, UploadPartCopyResult, UploadPartRequest,
13 UploadPartResult,
14};
15use crate::request::{OssRequest, RequestMethod};
16use crate::util::{validate_bucket_name, validate_object_key};
17use crate::{RequestBody, Result};
18
19use super::Client;
20
21pub trait MultipartUploadsOperations {
22 fn list_multipart_uploads<S>(&self, bucket_name: S, options: Option<ListMultipartUploadsOptions>) -> Result<ListMultipartUploadsResult>
26 where
27 S: AsRef<str>;
28
29 fn list_parts<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3, options: Option<ListPartsOptions>) -> Result<ListPartsResult>
33 where
34 S1: AsRef<str>,
35 S2: AsRef<str>,
36 S3: AsRef<str>;
37
38 fn initiate_multipart_uploads<S1, S2>(
42 &self,
43 bucket_name: S1,
44 object_key: S2,
45 options: Option<InitiateMultipartUploadOptions>,
46 ) -> Result<InitiateMultipartUploadResult>
47 where
48 S1: AsRef<str>,
49 S2: AsRef<str>;
50
51 fn upload_part_from_file<S1, S2, P>(
55 &self,
56 bucket_name: S1,
57 object_key: S2,
58 file_path: P,
59 range: Range<u64>,
60 params: UploadPartRequest,
61 ) -> Result<UploadPartResult>
62 where
63 S1: AsRef<str>,
64 S2: AsRef<str>,
65 P: AsRef<Path>;
66
67 fn upload_part_from_buffer<S1, S2, B>(&self, bucket_name: S1, object_key: S2, buffer: B, params: UploadPartRequest) -> Result<UploadPartResult>
71 where
72 S1: AsRef<str>,
73 S2: AsRef<str>,
74 B: Into<Vec<u8>>;
75
76 fn upload_part_from_base64<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, base64_string: S3, params: UploadPartRequest) -> Result<UploadPartResult>
80 where
81 S1: AsRef<str>,
82 S2: AsRef<str>,
83 S3: AsRef<str>;
84
85 fn upload_part_copy<S1, S2>(
91 &self,
92 bucket_name: S1,
93 dest_object_key: S2,
94 data: UploadPartCopyRequest,
95 options: Option<UploadPartCopyOptions>,
96 ) -> Result<UploadPartCopyResult>
97 where
98 S1: AsRef<str>,
99 S2: AsRef<str>;
100
101 fn complete_multipart_uploads<S1, S2>(
105 &self,
106 bucket_name: S1,
107 object_key: S2,
108 data: CompleteMultipartUploadRequest,
109 options: Option<CompleteMultipartUploadOptions>,
110 ) -> Result<CompleteMultipartUploadResult>
111 where
112 S1: AsRef<str>,
113 S2: AsRef<str>;
114
115 fn abort_multipart_uploads<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3) -> Result<()>
119 where
120 S1: AsRef<str>,
121 S2: AsRef<str>,
122 S3: AsRef<str>;
123}
124
125impl MultipartUploadsOperations for Client {
126 fn list_multipart_uploads<S>(&self, bucket_name: S, options: Option<ListMultipartUploadsOptions>) -> Result<ListMultipartUploadsResult>
130 where
131 S: AsRef<str>,
132 {
133 if !validate_bucket_name(bucket_name.as_ref()) {
134 return Err(Error::Other(format!("invalid bucket name: {}", bucket_name.as_ref())));
135 }
136 let request = build_list_multipart_uploads_request(bucket_name.as_ref(), &options)?;
137 let (_, xml) = self.do_request::<String>(request)?;
138
139 ListMultipartUploadsResult::from_xml(&xml)
140 }
141
142 fn list_parts<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3, options: Option<ListPartsOptions>) -> Result<ListPartsResult>
146 where
147 S1: AsRef<str>,
148 S2: AsRef<str>,
149 S3: AsRef<str>,
150 {
151 let request = build_list_parts_request(bucket_name.as_ref(), object_key.as_ref(), upload_id.as_ref(), &options)?;
152 let (_, xml) = self.do_request::<String>(request)?;
153 ListPartsResult::from_xml(&xml)
154 }
155
156 fn initiate_multipart_uploads<S1, S2>(
160 &self,
161 bucket_name: S1,
162 object_key: S2,
163 options: Option<InitiateMultipartUploadOptions>,
164 ) -> Result<InitiateMultipartUploadResult>
165 where
166 S1: AsRef<str>,
167 S2: AsRef<str>,
168 {
169 let request = build_initiate_multipart_uploads_request(bucket_name.as_ref(), object_key.as_ref(), &options)?;
170 let (_, xml) = self.do_request::<String>(request)?;
171 InitiateMultipartUploadResult::from_xml(&xml)
172 }
173
174 fn upload_part_from_file<S1, S2, P>(
178 &self,
179 bucket_name: S1,
180 object_key: S2,
181 file_path: P,
182 range: Range<u64>,
183 params: UploadPartRequest,
184 ) -> Result<UploadPartResult>
185 where
186 S1: AsRef<str>,
187 S2: AsRef<str>,
188 P: AsRef<Path>,
189 {
190 let request = build_upload_part_request(
191 bucket_name.as_ref(),
192 object_key.as_ref(),
193 RequestBody::File(file_path.as_ref().to_path_buf(), Some(range)),
194 params,
195 )?;
196
197 let (headers, _) = self.do_request::<()>(request)?;
198
199 Ok(headers.into())
200 }
201
202 fn upload_part_from_buffer<S1, S2, B>(&self, bucket_name: S1, object_key: S2, buffer: B, params: UploadPartRequest) -> Result<UploadPartResult>
206 where
207 S1: AsRef<str>,
208 S2: AsRef<str>,
209 B: Into<Vec<u8>>,
210 {
211 let request = build_upload_part_request(bucket_name.as_ref(), object_key.as_ref(), RequestBody::Bytes(buffer.into()), params)?;
212
213 let (headers, _) = self.do_request::<()>(request)?;
214
215 Ok(headers.into())
216 }
217
218 fn upload_part_from_base64<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, base64_string: S3, params: UploadPartRequest) -> Result<UploadPartResult>
222 where
223 S1: AsRef<str>,
224 S2: AsRef<str>,
225 S3: AsRef<str>,
226 {
227 let data = BASE64_STANDARD.decode(base64_string.as_ref())?;
228 self.upload_part_from_buffer(bucket_name, object_key, data, params)
229 }
230
231 fn upload_part_copy<S1, S2>(
237 &self,
238 bucket_name: S1,
239 dest_object_key: S2,
240 data: UploadPartCopyRequest,
241 options: Option<UploadPartCopyOptions>,
242 ) -> Result<UploadPartCopyResult>
243 where
244 S1: AsRef<str>,
245 S2: AsRef<str>,
246 {
247 let bucket_name = bucket_name.as_ref();
248 let object_key = dest_object_key.as_ref();
249 let requet = build_upload_part_copy_request(bucket_name, object_key, data, &options)?;
250 let (_, xml) = self.do_request::<String>(requet)?;
251 UploadPartCopyResult::from_xml(&xml)
252 }
253
254 fn complete_multipart_uploads<S1, S2>(
258 &self,
259 bucket_name: S1,
260 object_key: S2,
261 data: CompleteMultipartUploadRequest,
262 options: Option<CompleteMultipartUploadOptions>,
263 ) -> Result<CompleteMultipartUploadResult>
264 where
265 S1: AsRef<str>,
266 S2: AsRef<str>,
267 {
268 let with_callback = if let Some(opt) = &options { opt.callback.is_some() } else { false };
269
270 let request = build_complete_multipart_uploads_request(bucket_name.as_ref(), object_key.as_ref(), data, &options)?;
271 let (_, content) = self.do_request::<String>(request)?;
272
273 if with_callback {
274 Ok(CompleteMultipartUploadResult::CallbackResponse(content))
275 } else {
276 Ok(CompleteMultipartUploadResult::ApiResponse(CompleteMultipartUploadApiResponse::from_xml(
277 &content,
278 )?))
279 }
280 }
281
282 fn abort_multipart_uploads<S1, S2, S3>(&self, bucket_name: S1, object_key: S2, upload_id: S3) -> Result<()>
286 where
287 S1: AsRef<str>,
288 S2: AsRef<str>,
289 S3: AsRef<str>,
290 {
291 let bucket_name = bucket_name.as_ref();
292 let object_key = object_key.as_ref();
293
294 if !validate_bucket_name(bucket_name) {
295 return Err(Error::Other(format!("invalid bucket name: {}", bucket_name)));
296 }
297
298 if !validate_object_key(object_key) {
299 return Err(Error::Other(format!("invalid object key: {}", object_key)));
300 }
301
302 if upload_id.as_ref().is_empty() {
303 return Err(Error::Other("invalid upload id: [empty]".to_string()));
304 }
305
306 let request = OssRequest::new()
307 .method(RequestMethod::Delete)
308 .bucket(bucket_name)
309 .object(object_key)
310 .add_query("uploadId", upload_id);
311
312 self.do_request::<()>(request)?;
313
314 Ok(())
315 }
316}
317
318#[cfg(all(test, feature = "blocking"))]
319mod test_multipart_blocking {
320 use std::{
321 io::{Read, Seek},
322 ops::Range,
323 sync::Once,
324 };
325
326 use uuid::Uuid;
327
328 use crate::{
329 blocking::{multipart::MultipartUploadsOperations, object::ObjectOperations, Client},
330 multipart_common::{
331 CompleteMultipartUploadOptions, CompleteMultipartUploadRequest, CompleteMultipartUploadResult, UploadPartCopyOptionsBuilder, UploadPartCopyRequest,
332 UploadPartRequest,
333 },
334 object_common::{CallbackBodyParameter, CallbackBuilder},
335 };
336
337 static INIT: Once = Once::new();
338
339 fn setup() {
340 INIT.call_once(|| {
341 simple_logger::init_with_level(log::Level::Debug).unwrap();
342 dotenvy::dotenv().unwrap();
343 });
344 }
345
346 #[allow(dead_code)]
347 fn setup_comp() {
348 INIT.call_once(|| {
349 simple_logger::init_with_level(log::Level::Debug).unwrap();
350 dotenvy::from_filename(".env.comp").unwrap();
351 });
352 }
353
354 #[test]
389 fn test_multipart_uploads_from_file_blocking() {
390 setup();
391
392 let client = Client::from_env();
393
394 let bucket = "yuanyq";
395 let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
396 let file = "/home/yuanyq/Downloads/sourcegit_2025.06-1_amd64.deb";
397
398 let meta = std::fs::metadata(file).unwrap();
399
400 let slice_len: u64 = 5 * 1024 * 1024;
401 let mut ranges = vec![];
402 let mut c = 0;
403 loop {
404 let end = (c + 1) * slice_len;
405 let r = Range {
406 start: c * slice_len,
407 end: end.min(meta.len()),
408 };
409
410 ranges.push(r);
411
412 if end >= meta.len() {
413 break;
414 }
415
416 c += 1;
417 }
418
419 log::debug!("{:#?}", ranges);
420
421 let init_response = client.initiate_multipart_uploads(bucket, &object, None);
422 assert!(init_response.is_ok());
423
424 let init_result = init_response.unwrap();
425 let upload_id = init_result.upload_id.clone();
426 log::debug!("upload id = {}", upload_id);
427
428 let mut upload_results = vec![];
429
430 for (i, rng) in ranges.iter().enumerate() {
431 let upload_data = UploadPartRequest {
432 part_number: (i + 1) as u32,
433 upload_id: upload_id.clone(),
434 };
435
436 log::debug!("begin to upload part {}", i);
437
438 let upload_response = client.upload_part_from_file(bucket, &object, file, rng.clone(), upload_data);
439
440 log::debug!("{:#?}", upload_response);
441
442 assert!(upload_response.is_ok());
443
444 let upload_result = upload_response.unwrap();
445 upload_results.push(((i + 1) as u32, upload_result.etag));
446 }
447
448 log::debug!("all parts uploaded, check it");
449 let resp = client.list_parts(bucket, &object, &upload_id, None);
450 assert!(resp.is_ok());
451
452 let ret = resp.unwrap();
453 assert_eq!(ranges.len(), ret.parts.len());
454
455 log::debug!("going to complete multipart upload for upload id: {}", upload_id);
456
457 let comp_response = client.complete_multipart_uploads(
458 bucket,
459 &object,
460 CompleteMultipartUploadRequest {
461 upload_id,
462 parts: upload_results,
463 },
464 None,
465 );
466
467 log::debug!("{:#?}", comp_response);
468
469 log::debug!("multipart uploads completed");
470
471 client.delete_object(bucket, &object, None).unwrap();
472 }
473
474 #[test]
475 fn test_upload_part_from_buffer_blocking() {
476 setup();
477
478 let client = Client::from_env();
479
480 let bucket = "yuanyq";
481 let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
482 let file = "/home/yuanyq/Downloads/sourcegit_2025.06-1_amd64.deb";
483
484 let meta = std::fs::metadata(file).unwrap();
485
486 let slice_len: u64 = 10 * 1024 * 1024;
487 let mut ranges = vec![];
488 let mut c = 0;
489 loop {
490 let end = (c + 1) * slice_len;
491 let r = Range {
492 start: c * slice_len,
493 end: end.min(meta.len()),
494 };
495
496 ranges.push(r);
497
498 if end >= meta.len() {
499 break;
500 }
501
502 c += 1;
503 }
504
505 log::debug!("{:#?}", ranges);
506
507 let init_response = client.initiate_multipart_uploads(bucket, &object, None);
508 assert!(init_response.is_ok());
509
510 let init_result = init_response.unwrap();
511 let upload_id = init_result.upload_id.clone();
512 log::debug!("upload id = {}", upload_id);
513
514 let mut upload_results = vec![];
515
516 for (i, rng) in ranges.iter().enumerate() {
517 let part_no = (i + 1) as u32;
518
519 log::debug!("begin to upload part {}", i);
520
521 let mut buf = Vec::new();
522 let mut stream = std::fs::File::open(file).unwrap();
523 stream.seek(std::io::SeekFrom::Start(rng.start)).unwrap();
524 let mut partial = stream.take(rng.end - rng.start);
525 partial.read_to_end(&mut buf).unwrap();
526
527 let upload_data = UploadPartRequest {
528 part_number: part_no,
529 upload_id: upload_id.clone(),
530 };
531
532 let upload_response = client.upload_part_from_buffer(bucket, &object, buf, upload_data);
533
534 log::debug!("{:#?}", upload_response);
535
536 assert!(upload_response.is_ok());
537
538 let upload_result = upload_response.unwrap();
539 upload_results.push(((i + 1) as u32, upload_result.etag));
540 }
541
542 let list_parts_response = client.list_parts(bucket, &object, upload_id.clone(), None);
543 log::debug!("{:#?}", list_parts_response);
544 assert!(list_parts_response.is_ok());
545 let list_parts_result = list_parts_response.unwrap();
546
547 assert_eq!(ranges.len(), list_parts_result.parts.len());
548
549 let abort_response = client.abort_multipart_uploads(bucket, &object, upload_id.clone());
550 log::debug!("{:#?}", abort_response);
551 assert!(abort_response.is_ok());
552
553 let resp = client.exists(bucket, &object, None);
554 assert!(resp.is_ok());
555 assert!(!resp.unwrap());
556 }
557
558 #[test]
559 fn test_upload_part_copy_blocking() {
560 setup();
561
562 let client = Client::from_env();
563
564 let bucket = "yuanyq";
565
566 let source_object_key = "rust-sdk-test/img-appended-from-file.jpg";
567 let dest_object_key = format!("rust-sdk-test/img-{}.jpg", Uuid::new_v4());
568
569 let init_response = client.initiate_multipart_uploads(bucket, &dest_object_key, None);
570 assert!(init_response.is_ok());
571
572 let upload_id = init_response.unwrap().upload_id.clone();
573
574 let upload_response = client.upload_part_copy(
576 bucket,
577 &dest_object_key,
578 UploadPartCopyRequest::new(1, &upload_id, source_object_key),
579 Some(UploadPartCopyOptionsBuilder::new().copy_source_range("bytes=0-185000").build()),
580 );
581 assert!(upload_response.is_ok());
582 log::debug!("upload response 1: {:#?}", upload_response);
583
584 let etag1 = upload_response.unwrap().etag;
585
586 let upload_response = client.upload_part_copy(
587 bucket,
588 &dest_object_key,
589 UploadPartCopyRequest::new(2, &upload_id, source_object_key),
590 Some(UploadPartCopyOptionsBuilder::new().copy_source_range("bytes=185001-").build()),
591 );
592 assert!(upload_response.is_ok());
593 log::debug!("upload response 2: {:#?}", upload_response);
594
595 let etag2 = upload_response.unwrap().etag;
596
597 let comp_data = CompleteMultipartUploadRequest {
598 upload_id,
599 parts: vec![(1, etag1), (2, etag2)],
600 };
601
602 let comp_response = client.complete_multipart_uploads(bucket, &dest_object_key, comp_data, None);
603 log::debug!("complete multipart upload response: {:#?}", comp_response);
604
605 client.delete_object(bucket, &dest_object_key, None).unwrap();
606 }
607
608 #[test]
609 fn test_multipart_upload_with_callback_blocking() {
610 log::debug!("test multipart upload with callback while completing");
611 setup();
612
613 let callback_url = std::env::var("CALLBACK_TEST_URL").unwrap();
614
615 let client = Client::from_env();
616
617 let bucket = "yuanyq";
618 let object = format!("rust-sdk-test/multipart-{}.deb", Uuid::new_v4());
619 let file = "/home/yuanyq/Downloads/sourcegit_2025.06-1_amd64.deb";
620
621 let meta = std::fs::metadata(file).unwrap();
622
623 let slice_len: u64 = 5 * 1024 * 1024;
624 let mut ranges = vec![];
625 let mut c = 0;
626 loop {
627 let end = (c + 1) * slice_len;
628 let r = Range {
629 start: c * slice_len,
630 end: end.min(meta.len()),
631 };
632
633 ranges.push(r);
634
635 if end >= meta.len() {
636 break;
637 }
638
639 c += 1;
640 }
641
642 log::debug!("{:#?}", ranges);
643
644 let init_response = client.initiate_multipart_uploads(bucket, &object, None);
645 assert!(init_response.is_ok());
646
647 let init_result = init_response.unwrap();
648 let upload_id = init_result.upload_id.clone();
649 log::debug!("upload id = {}", upload_id);
650
651 let mut upload_results = vec![];
652
653 for (i, rng) in ranges.iter().enumerate() {
654 let upload_data = UploadPartRequest {
655 part_number: (i + 1) as u32,
656 upload_id: upload_id.clone(),
657 };
658
659 log::debug!("begin to upload part {}", i);
660
661 let upload_response = client.upload_part_from_file(bucket, &object, file, rng.clone(), upload_data);
662
663 log::debug!("{:#?}", upload_response);
664
665 assert!(upload_response.is_ok());
666
667 let upload_result = upload_response.unwrap();
668 upload_results.push(((i + 1) as u32, upload_result.etag));
669 }
670
671 log::debug!("all parts uploaded, check it");
672 let resp = client.list_parts(bucket, &object, &upload_id, None);
673 assert!(resp.is_ok());
674
675 let ret = resp.unwrap();
676 assert_eq!(ranges.len(), ret.parts.len());
677
678 log::debug!("going to complete multipart upload for upload id: {}", upload_id);
679
680 let cb = CallbackBuilder::new(&callback_url)
681 .body_parameter(CallbackBodyParameter::OssBucket("the_bucket"))
682 .body_parameter(CallbackBodyParameter::OssObject("the_object_key"))
683 .body_parameter(CallbackBodyParameter::OssETag("the_etag"))
684 .body_parameter(CallbackBodyParameter::OssSize("the_size"))
685 .body_parameter(CallbackBodyParameter::OssCrc64("the_crc"))
686 .body_parameter(CallbackBodyParameter::OssClientIp("the_client_ip"))
687 .body_parameter(CallbackBodyParameter::OssContentMd5("the_content_md5"))
688 .body_parameter(CallbackBodyParameter::OssMimeType("the_mime_type"))
689 .body_parameter(CallbackBodyParameter::OssImageWidth("the_image_width"))
690 .body_parameter(CallbackBodyParameter::OssImageHeight("the_image_height"))
691 .body_parameter(CallbackBodyParameter::OssImageFormat("the_image_format"))
692 .body_parameter(CallbackBodyParameter::Custom("my-key", "my-prop", "hello world".to_string()))
693 .body_parameter(CallbackBodyParameter::Constant("my-key-constant", "the-value"))
694 .body_parameter(CallbackBodyParameter::Literal("k1".to_string(), "${x:v1}".to_string()))
695 .custom_variable("v1", "this is value of v1")
696 .build();
697
698 let options = CompleteMultipartUploadOptions { callback: Some(cb) };
699
700 let comp_response = client.complete_multipart_uploads(
701 bucket,
702 &object,
703 CompleteMultipartUploadRequest {
704 upload_id,
705 parts: upload_results,
706 },
707 Some(options),
708 );
709
710 log::debug!("{:#?}", comp_response);
711
712 log::debug!("multipart uploads completed");
713
714 let ret = comp_response.unwrap();
715
716 if let CompleteMultipartUploadResult::CallbackResponse(s) = ret {
717 assert!(s.contains(&serde_json::to_string(&object).unwrap()));
718 } else {
719 panic!("no callback json content returned");
720 }
721
722 client.delete_object(bucket, &object, None).unwrap();
723 }
724}