1use super::{
2 super::{
3 multi_parts_uploader::{MultiPartsUploaderExt, PartsExpiredError},
4 ConcurrencyProvider, DataPartitionProvider, DataSource, FixedDataPartitionProvider, MultiPartsUploader,
5 ObjectParams, ReinitializeOptions, UploadedPart,
6 },
7 utils::{
8 keep_original_region_options, need_to_retry, no_region_tried_error, remove_used_region_from_regions,
9 specify_region_options, UploadPartsError, UploadResumedPartsError,
10 },
11 MultiPartsUploaderScheduler,
12};
13use qiniu_apis::http_client::{ApiResult, ResponseError};
14use serde_json::Value;
15use std::num::NonZeroU64;
16
17#[cfg(feature = "async")]
18use {
19 super::AsyncDataSource,
20 futures::future::{BoxFuture, OptionFuture},
21};
22
23#[derive(Debug, Clone)]
87pub struct SerialMultiPartsUploaderScheduler<M: MultiPartsUploader> {
88 data_partition_provider: Box<dyn DataPartitionProvider>,
89 multi_parts_uploader: M,
90}
91
92impl<M: MultiPartsUploader> SerialMultiPartsUploaderScheduler<M> {
93 #[inline]
95 pub fn new(multi_parts_uploader: M) -> Self {
96 Self {
97 data_partition_provider: Box::new(FixedDataPartitionProvider::new_with_non_zero_part_size(
98 #[allow(unsafe_code)]
99 unsafe {
100 NonZeroU64::new_unchecked(1 << 22)
101 },
102 )),
103 multi_parts_uploader,
104 }
105 }
106
107 #[inline]
109 pub fn data_partition_provider(&self) -> &dyn DataPartitionProvider {
110 &self.data_partition_provider
111 }
112}
113
114impl<M: MultiPartsUploader> MultiPartsUploaderScheduler<M::HashAlgorithm> for SerialMultiPartsUploaderScheduler<M> {
115 fn set_concurrency_provider(&mut self, _concurrency_provider: Box<dyn ConcurrencyProvider>) {}
116
117 fn set_data_partition_provider(&mut self, data_partition_provider: Box<dyn DataPartitionProvider>) {
118 self.data_partition_provider = data_partition_provider;
119 }
120
121 fn upload(&self, source: Box<dyn DataSource<M::HashAlgorithm>>, params: ObjectParams) -> ApiResult<Value> {
122 return match _resume_and_upload(self, source.to_owned(), params.to_owned()) {
123 None => match _try_to_upload_to_all_regions(self, source, params, None) {
124 Ok(None) => Err(no_region_tried_error()),
125 Ok(Some(value)) => Ok(value),
126 Err(err) => Err(err),
127 },
128 Some(Err(UploadPartsError { err, .. })) if !need_to_retry(&err) => Err(err),
129 Some(Err(UploadPartsError { initialized, err })) => {
130 match _try_to_upload_to_all_regions(self, source, params, initialized) {
131 Ok(None) => Err(err),
132 Ok(Some(value)) => Ok(value),
133 Err(err) => Err(err),
134 }
135 }
136 Some(Ok(value)) => Ok(value),
137 };
138
139 fn _resume_and_upload<M: MultiPartsUploader>(
140 scheduler: &SerialMultiPartsUploaderScheduler<M>,
141 source: Box<dyn DataSource<M::HashAlgorithm>>,
142 params: ObjectParams,
143 ) -> Option<Result<Value, UploadPartsError<M::InitializedParts>>> {
144 _upload_resumed_parts(scheduler, source, params).map(|result| match result {
145 Ok(value) => Ok(value),
146 Err(UploadResumedPartsError {
147 err,
148 resumed: true,
149 initialized: Some(mut initialized),
150 }) if err.extensions().get::<PartsExpiredError>().is_some() => {
151 match _reinitialize_and_upload_again(scheduler, &mut initialized, keep_original_region_options()) {
152 Some(Ok(value)) => Ok(value),
153 Some(Err(err)) => Err(UploadPartsError::new(err, Some(initialized))),
154 None => Err(UploadPartsError::new(err, Some(initialized))),
155 }
156 }
157 Err(UploadResumedPartsError { err, initialized, .. }) => Err(UploadPartsError::new(err, initialized)),
158 })
159 }
160
161 fn _upload_resumed_parts<M: MultiPartsUploader>(
162 scheduler: &SerialMultiPartsUploaderScheduler<M>,
163 source: Box<dyn DataSource<M::HashAlgorithm>>,
164 params: ObjectParams,
165 ) -> Option<Result<Value, UploadResumedPartsError<M::InitializedParts>>> {
166 scheduler
167 .multi_parts_uploader
168 .try_to_resume_parts(source, params)
169 .map(|initialized| {
170 _upload_after_initialize(scheduler, &initialized)
171 .map_err(|(err, resumed)| UploadResumedPartsError::new(err, resumed, Some(initialized)))
172 })
173 }
174
175 fn _try_to_upload_to_all_regions<M: MultiPartsUploader>(
176 scheduler: &SerialMultiPartsUploaderScheduler<M>,
177 source: Box<dyn DataSource<M::HashAlgorithm>>,
178 params: ObjectParams,
179 mut initialized: Option<M::InitializedParts>,
180 ) -> ApiResult<Option<Value>> {
181 let mut regions = scheduler
182 .multi_parts_uploader
183 .get_bucket_regions(¶ms)
184 .map(|r| r.into_regions())?;
185 if let Some(initialized) = &initialized {
186 remove_used_region_from_regions(&mut regions, initialized);
187 }
188 let mut last_err = None;
189 for region in regions {
190 let initialized_result = if let Some(mut initialized) = initialized.take() {
191 scheduler
192 .multi_parts_uploader
193 .reinitialize_parts(&mut initialized, specify_region_options(region))
194 .map(|_| initialized)
195 } else {
196 scheduler
197 .multi_parts_uploader
198 .initialize_parts(source.to_owned(), params.to_owned())
199 };
200 let new_initialized = match initialized_result {
201 Ok(new_initialized) => {
202 initialized = Some(new_initialized.to_owned());
203 new_initialized
204 }
205 Err(err) => {
206 let to_retry = need_to_retry(&err);
207 last_err = Some(err);
208 if to_retry {
209 continue;
210 } else {
211 break;
212 }
213 }
214 };
215 match _upload_after_reinitialize(scheduler, &new_initialized) {
216 Ok(value) => {
217 return Ok(Some(value));
218 }
219 Err(err) => {
220 let to_retry = need_to_retry(&err);
221 last_err = Some(err);
222 if to_retry {
223 continue;
224 } else {
225 break;
226 }
227 }
228 }
229 }
230 last_err.map_or(Ok(None), Err)
231 }
232
233 fn _upload_after_initialize<M: MultiPartsUploader>(
234 scheduler: &SerialMultiPartsUploaderScheduler<M>,
235 initialized: &M::InitializedParts,
236 ) -> Result<Value, (ResponseError, bool)> {
237 let mut parts = Vec::with_capacity(4);
238 let mut resumed = false;
239 loop {
240 match scheduler
241 .multi_parts_uploader
242 .upload_part(initialized, &scheduler.data_partition_provider)
243 {
244 Ok(Some(uploaded_part)) => {
245 if uploaded_part.resumed() {
246 resumed = true;
247 }
248 parts.push(uploaded_part);
249 }
250 Ok(None) => break,
251 Err(err) => return Err((err, resumed)),
252 }
253 }
254 scheduler
255 .multi_parts_uploader
256 .complete_parts(initialized, &parts)
257 .map_err(|err| (err, resumed))
258 }
259
260 fn _reinitialize_and_upload_again<M: MultiPartsUploader>(
261 scheduler: &SerialMultiPartsUploaderScheduler<M>,
262 initialized: &mut M::InitializedParts,
263 reinitialize_options: ReinitializeOptions,
264 ) -> Option<ApiResult<Value>> {
265 scheduler
266 .multi_parts_uploader
267 .reinitialize_parts(initialized, reinitialize_options)
268 .ok()
269 .map(|_| _upload_after_reinitialize(scheduler, initialized))
270 }
271
272 fn _upload_after_reinitialize<M: MultiPartsUploader>(
273 scheduler: &SerialMultiPartsUploaderScheduler<M>,
274 initialized: &M::InitializedParts,
275 ) -> ApiResult<Value> {
276 let mut parts = Vec::with_capacity(4);
277 while let Some(uploaded_part) = scheduler
278 .multi_parts_uploader
279 .upload_part(initialized, &scheduler.data_partition_provider)?
280 {
281 parts.push(uploaded_part);
282 }
283 scheduler.multi_parts_uploader.complete_parts(initialized, &parts)
284 }
285 }
286
287 #[cfg(feature = "async")]
288 #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
289 fn async_upload(
290 &self,
291 source: Box<dyn AsyncDataSource<M::HashAlgorithm>>,
292 params: ObjectParams,
293 ) -> BoxFuture<ApiResult<Value>> {
294 return Box::pin(async move {
295 match _resume_and_upload(self, source.to_owned(), params.to_owned()).await {
296 None => match _try_to_upload_to_all_regions(self, source, params, None).await {
297 Ok(None) => Err(no_region_tried_error()),
298 Ok(Some(value)) => Ok(value),
299 Err(err) => Err(err),
300 },
301 Some(Err(UploadPartsError { err, .. })) if !need_to_retry(&err) => Err(err),
302 Some(Err(UploadPartsError { initialized, err })) => {
303 match _try_to_upload_to_all_regions(self, source, params, initialized).await {
304 Ok(None) => Err(err),
305 Ok(Some(value)) => Ok(value),
306 Err(err) => Err(err),
307 }
308 }
309 Some(Ok(value)) => Ok(value),
310 }
311 });
312
313 async fn _resume_and_upload<M: MultiPartsUploader>(
314 scheduler: &SerialMultiPartsUploaderScheduler<M>,
315 source: Box<dyn AsyncDataSource<M::HashAlgorithm>>,
316 params: ObjectParams,
317 ) -> Option<Result<Value, UploadPartsError<M::AsyncInitializedParts>>> {
318 OptionFuture::from(
319 _upload_resumed_parts(scheduler, source, params)
320 .await
321 .map(|result| async move {
322 match result {
323 Ok(value) => Ok(value),
324 Err(UploadResumedPartsError {
325 err,
326 resumed: true,
327 initialized: Some(mut initialized),
328 }) if err.extensions().get::<PartsExpiredError>().is_some() => {
329 match _reinitialize_and_upload_again(
330 scheduler,
331 &mut initialized,
332 keep_original_region_options(),
333 )
334 .await
335 {
336 Some(Ok(value)) => Ok(value),
337 Some(Err(err)) => Err(UploadPartsError::new(err, Some(initialized))),
338 None => Err(UploadPartsError::new(err, Some(initialized))),
339 }
340 }
341 Err(UploadResumedPartsError { err, initialized, .. }) => {
342 Err(UploadPartsError::new(err, initialized))
343 }
344 }
345 }),
346 )
347 .await
348 }
349
350 async fn _upload_resumed_parts<M: MultiPartsUploader>(
351 scheduler: &SerialMultiPartsUploaderScheduler<M>,
352 source: Box<dyn AsyncDataSource<M::HashAlgorithm>>,
353 params: ObjectParams,
354 ) -> Option<Result<Value, UploadResumedPartsError<M::AsyncInitializedParts>>> {
355 OptionFuture::from(
356 scheduler
357 .multi_parts_uploader
358 .try_to_async_resume_parts(source, params)
359 .await
360 .map(|initialized| async move {
361 _upload_after_initialize(scheduler, &initialized)
362 .await
363 .map_err(|(err, resumed)| UploadResumedPartsError::new(err, resumed, Some(initialized)))
364 }),
365 )
366 .await
367 }
368
369 async fn _try_to_upload_to_all_regions<M: MultiPartsUploader>(
370 scheduler: &SerialMultiPartsUploaderScheduler<M>,
371 source: Box<dyn AsyncDataSource<M::HashAlgorithm>>,
372 params: ObjectParams,
373 mut initialized: Option<M::AsyncInitializedParts>,
374 ) -> ApiResult<Option<Value>> {
375 let mut regions = scheduler
376 .multi_parts_uploader
377 .async_get_bucket_regions(¶ms)
378 .await
379 .map(|r| r.into_regions())?;
380 if let Some(initialized) = &initialized {
381 remove_used_region_from_regions(&mut regions, initialized);
382 }
383 let mut last_err = None;
384 for region in regions {
385 let initialized_result = if let Some(mut initialized) = initialized.take() {
386 scheduler
387 .multi_parts_uploader
388 .async_reinitialize_parts(&mut initialized, specify_region_options(region))
389 .await
390 .map(|_| initialized)
391 } else {
392 scheduler
393 .multi_parts_uploader
394 .async_initialize_parts(source.to_owned(), params.to_owned())
395 .await
396 };
397 let new_initialized = match initialized_result {
398 Ok(new_initialized) => {
399 initialized = Some(new_initialized.to_owned());
400 new_initialized
401 }
402 Err(err) => {
403 let to_retry = need_to_retry(&err);
404 last_err = Some(err);
405 if to_retry {
406 continue;
407 } else {
408 break;
409 }
410 }
411 };
412 match _upload_after_reinitialize(scheduler, &new_initialized).await {
413 Ok(value) => {
414 return Ok(Some(value));
415 }
416 Err(err) => {
417 let to_retry = need_to_retry(&err);
418 last_err = Some(err);
419 if to_retry {
420 continue;
421 } else {
422 break;
423 }
424 }
425 }
426 }
427 last_err.map_or(Ok(None), Err)
428 }
429
430 async fn _upload_after_initialize<M: MultiPartsUploader>(
431 scheduler: &SerialMultiPartsUploaderScheduler<M>,
432 initialized: &M::AsyncInitializedParts,
433 ) -> Result<Value, (ResponseError, bool)> {
434 let mut parts = Vec::with_capacity(4);
435 let mut resumed = false;
436 loop {
437 match scheduler
438 .multi_parts_uploader
439 .async_upload_part(initialized, &scheduler.data_partition_provider)
440 .await
441 {
442 Ok(Some(uploaded_part)) => {
443 if uploaded_part.resumed() {
444 resumed = true;
445 }
446 parts.push(uploaded_part);
447 }
448 Ok(None) => break,
449 Err(err) => return Err((err, resumed)),
450 }
451 }
452 scheduler
453 .multi_parts_uploader
454 .async_complete_parts(initialized, &parts)
455 .await
456 .map_err(|err| (err, resumed))
457 }
458
459 async fn _reinitialize_and_upload_again<M: MultiPartsUploader>(
460 scheduler: &SerialMultiPartsUploaderScheduler<M>,
461 initialized: &mut M::AsyncInitializedParts,
462 reinitialize_options: ReinitializeOptions,
463 ) -> Option<ApiResult<Value>> {
464 OptionFuture::from(
465 scheduler
466 .multi_parts_uploader
467 .async_reinitialize_parts(initialized, reinitialize_options)
468 .await
469 .ok()
470 .map(|_| _upload_after_reinitialize(scheduler, initialized)),
471 )
472 .await
473 }
474
475 async fn _upload_after_reinitialize<M: MultiPartsUploader>(
476 scheduler: &SerialMultiPartsUploaderScheduler<M>,
477 initialized: &M::AsyncInitializedParts,
478 ) -> ApiResult<Value> {
479 let mut parts = Vec::with_capacity(4);
480 while let Some(uploaded_part) = scheduler
481 .multi_parts_uploader
482 .async_upload_part(initialized, &scheduler.data_partition_provider)
483 .await?
484 {
485 parts.push(uploaded_part);
486 }
487 scheduler
488 .multi_parts_uploader
489 .async_complete_parts(initialized, &parts)
490 .await
491 }
492 }
493}
494
495#[cfg(feature = "async")]
496#[cfg(test)]
497mod tests {
498 use super::{
499 super::super::{
500 data_source::AsyncDigestible, AsyncFileDataSource, FileSystemResumableRecorder, MultiPartsV1Uploader,
501 MultiPartsV2Uploader, UploadManager, UploadTokenSigner,
502 },
503 *,
504 };
505 use anyhow::Result as AnyResult;
506 use async_std::task::{sleep, spawn as spawn_task};
507 use futures::{
508 io::{copy as async_io_copy, sink as async_io_sink},
509 AsyncRead,
510 };
511 use qiniu_apis::{
512 credential::Credential,
513 http::{
514 AsyncRequest, AsyncReset, AsyncResponse, AsyncResponseResult, HeaderValue, HttpCaller, StatusCode,
515 SyncRequest, SyncResponseResult,
516 },
517 http_client::{
518 AsyncResponseBody, DirectChooser, ErrorRetrier, HttpClient, LimitedRetrier, NeverRetrier, Region,
519 RequestRetrier, StaticRegionsProvider, NO_BACKOFF,
520 },
521 };
522 use qiniu_utils::base64::urlsafe as urlsafe_base64;
523 use rand::{thread_rng, RngCore};
524 use serde_json::{json, to_vec as json_to_vec};
525 use sha1::Sha1;
526 use std::{
527 io::{copy as io_copy, Read, Result as IoResult},
528 sync::{
529 atomic::{AtomicUsize, Ordering},
530 Arc,
531 },
532 time::{Duration, SystemTime, UNIX_EPOCH},
533 };
534 use tempfile::{Builder as TempfileBuilder, TempPath};
535 use text_io::scan as scan_text;
536
537 const BLOCK_SIZE: u64 = 4 << 20;
538
539 #[async_std::test]
540 async fn test_serial_multi_parts_uploader_scheduler_with_async_multi_parts_v1_upload_with_recovery() -> AnyResult<()>
541 {
542 env_logger::builder().is_test(true).try_init().ok();
543
544 #[derive(Debug, Default)]
545 struct FakeHttpCaller {
546 mkblk_counts: AtomicUsize,
547 }
548
549 impl HttpCaller for FakeHttpCaller {
550 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
551 unreachable!()
552 }
553
554 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
555 Box::pin(async move {
556 if request.url().path().starts_with("/mkblk/") {
557 let blk_size: u64;
558 scan_text!(request.url().path().bytes() => "/mkblk/{}", blk_size);
559
560 match blk_size {
561 BLOCK_SIZE => {
562 assert_eq!(self.mkblk_counts.fetch_add(1, Ordering::Relaxed), 0);
563 }
564 _ => unreachable!(),
565 }
566 let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
567 assert_eq!(body_len, blk_size);
568 let resp_body = json_to_vec(&json!({
569 "ctx": "===0===",
570 "checksum": sha1_of_async_reader(request.body_mut()).await.unwrap(),
571 "offset": blk_size,
572 "host": "http://fakeexample.com",
573 "expired_at": (SystemTime::now()+Duration::from_secs(3600)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
574 }))
575 .unwrap();
576 Ok(AsyncResponse::builder()
577 .status_code(StatusCode::OK)
578 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
579 .body(AsyncResponseBody::from_bytes(resp_body))
580 .build())
581 } else if request.url().path().starts_with("/mkfile/") {
582 let resp_body = json_to_vec(&json!({
583 "error": "test error",
584 }))
585 .unwrap();
586 Ok(AsyncResponse::builder()
587 .status_code(StatusCode::BAD_REQUEST)
588 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
589 .body(AsyncResponseBody::from_bytes(resp_body))
590 .build())
591 } else {
592 unreachable!()
593 }
594 })
595 }
596 }
597
598 let resuming_files_dir = TempfileBuilder::new().tempdir()?;
599 let file_path = spawn_task(async { random_file_path(BLOCK_SIZE) }).await?;
600
601 {
602 let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV1Uploader::new(
603 get_upload_manager(FakeHttpCaller::default()),
604 FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
605 ));
606 let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
607 let params = ObjectParams::builder()
608 .region_provider(single_up_domain_region())
609 .build();
610 uploader.async_upload(file_source, params).await.unwrap_err();
611 }
612
613 #[derive(Debug, Default)]
614 struct FakeHttpCaller2 {
615 mkblk_counts: AtomicUsize,
616 mkfile_counts: AtomicUsize,
617 }
618
619 impl HttpCaller for FakeHttpCaller2 {
620 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
621 unreachable!()
622 }
623
624 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
625 Box::pin(async move {
626 if request.url().path().starts_with("/mkblk/") {
627 let blk_size: u64;
628 scan_text!(request.url().path().bytes() => "/mkblk/{}", blk_size);
629
630 match blk_size {
631 BLOCK_SIZE => {
632 assert_eq!(self.mkblk_counts.fetch_add(1, Ordering::Relaxed), 0);
633 }
634 _ => unreachable!(),
635 }
636 let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
637 assert_eq!(body_len, blk_size);
638 let resp_body = json_to_vec(&json!({
639 "ctx": "===0===",
640 "checksum": sha1_of_async_reader(request.body_mut()).await.unwrap(),
641 "offset": blk_size,
642 "host": "http://fakeexample.com",
643 "expired_at": (SystemTime::now()+Duration::from_secs(5)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
644 }))
645 .unwrap();
646 Ok(AsyncResponse::builder()
647 .status_code(StatusCode::OK)
648 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
649 .body(AsyncResponseBody::from_bytes(resp_body))
650 .build())
651 } else if request.url().path().starts_with("/mkfile/") {
652 assert!(self.mkfile_counts.fetch_add(1, Ordering::Relaxed) < 2);
653 let resp_body = json_to_vec(&json!({
654 "error": "invalid ctx",
655 }))
656 .unwrap();
657 Ok(AsyncResponse::builder()
658 .status_code(StatusCode::from_u16(701).unwrap())
659 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
660 .body(AsyncResponseBody::from_bytes(resp_body))
661 .build())
662 } else {
663 unreachable!()
664 }
665 })
666 }
667 }
668
669 {
670 let caller = Arc::new(FakeHttpCaller2::default());
671 {
672 let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV1Uploader::new(
673 get_upload_manager(caller.to_owned()),
674 FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
675 ));
676 let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
677 let params = ObjectParams::builder()
678 .region_provider(single_up_domain_region())
679 .build();
680 uploader.async_upload(file_source, params).await.unwrap_err();
681 }
682 let caller = Arc::try_unwrap(caller).unwrap();
683 assert_eq!(caller.mkblk_counts.into_inner(), 1);
684 assert_eq!(caller.mkfile_counts.into_inner(), 2);
685 }
686
687 sleep(Duration::from_secs(5)).await;
688
689 #[derive(Debug, Default)]
690 struct FakeHttpCaller3 {
691 mkblk_counts: AtomicUsize,
692 mkfile_counts: AtomicUsize,
693 }
694
695 impl HttpCaller for FakeHttpCaller3 {
696 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
697 unreachable!()
698 }
699
700 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
701 Box::pin(async move {
702 if request.url().path().starts_with("/mkblk/") {
703 let blk_size: u64;
704 scan_text!(request.url().path().bytes() => "/mkblk/{}", blk_size);
705
706 match blk_size {
707 BLOCK_SIZE => {
708 assert_eq!(self.mkblk_counts.fetch_add(1, Ordering::Relaxed), 0);
709 }
710 _ => unreachable!(),
711 }
712 let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
713 assert_eq!(body_len, blk_size);
714 let resp_body = json_to_vec(&json!({
715 "ctx": "===0===",
716 "checksum": sha1_of_async_reader(request.body_mut()).await.unwrap(),
717 "offset": blk_size,
718 "host": "http://fakeexample.com",
719 "expired_at": (SystemTime::now()+Duration::from_secs(3600)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
720 }))
721 .unwrap();
722 Ok(AsyncResponse::builder()
723 .status_code(StatusCode::OK)
724 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
725 .body(AsyncResponseBody::from_bytes(resp_body))
726 .build())
727 } else if request.url().path().starts_with("/mkfile/") {
728 assert!(self.mkfile_counts.fetch_add(1, Ordering::Relaxed) < 2);
729 let resp_body = json_to_vec(&json!({
730 "ok": 1,
731 }))
732 .unwrap();
733 Ok(AsyncResponse::builder()
734 .status_code(StatusCode::OK)
735 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
736 .body(AsyncResponseBody::from_bytes(resp_body))
737 .build())
738 } else {
739 unreachable!()
740 }
741 })
742 }
743 }
744
745 {
746 let caller = Arc::new(FakeHttpCaller3::default());
747 {
748 let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV1Uploader::new(
749 get_upload_manager(caller.to_owned()),
750 FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
751 ));
752 let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
753 let params = ObjectParams::builder()
754 .region_provider(single_up_domain_region())
755 .build();
756 let body = uploader.async_upload(file_source, params).await.unwrap();
757 assert_eq!(body.get("ok").unwrap().as_i64(), Some(1));
758 }
759 let caller = Arc::try_unwrap(caller).unwrap();
760 assert_eq!(caller.mkblk_counts.into_inner(), 1);
761 assert_eq!(caller.mkfile_counts.into_inner(), 1);
762 }
763
764 Ok(())
765 }
766
767 #[async_std::test]
768 async fn test_serial_multi_parts_uploader_scheduler_with_async_multi_parts_v2_upload_with_recovery() -> AnyResult<()>
769 {
770 env_logger::builder().is_test(true).try_init().ok();
771
772 #[derive(Debug, Default)]
773 struct FakeHttpCaller {
774 init_parts_counts: AtomicUsize,
775 upload_part_counts: AtomicUsize,
776 }
777
778 impl HttpCaller for FakeHttpCaller {
779 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
780 unreachable!()
781 }
782
783 #[cfg(feature = "async")]
784 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
785 Box::pin(async move {
786 if request.url().path() == "/buckets/fakebucket/objects/~/uploads" {
787 assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 0);
788 assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 0);
789 let resp_body = json_to_vec(&json!({
790 "uploadId": "fakeuploadid",
791 "expireAt": (SystemTime::now() + Duration::from_secs(3600)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
792 }))
793 .unwrap();
794 Ok(AsyncResponse::builder()
795 .status_code(StatusCode::OK)
796 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
797 .body(AsyncResponseBody::from_bytes(resp_body))
798 .build())
799 } else if request
800 .url()
801 .path()
802 .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid/")
803 {
804 let page_number: usize;
805 scan_text!(request.url().path().bytes() => "/buckets/fakebucket/objects/~/uploads/fakeuploadid/{}", page_number);
806 assert_eq!(page_number, 1);
807 let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
808 assert_eq!(body_len, BLOCK_SIZE);
809 assert_eq!(self.upload_part_counts.fetch_add(1, Ordering::Relaxed), 0);
810 let resp_body = json_to_vec(&json!({
811 "etag": format!("==={page_number}==="),
812 "md5": "fake-md5",
813 }))
814 .unwrap();
815 Ok(AsyncResponse::builder()
816 .status_code(StatusCode::OK)
817 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
818 .body(AsyncResponseBody::from_bytes(resp_body))
819 .build())
820 } else if request
821 .url()
822 .path()
823 .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid")
824 {
825 let resp_body = json_to_vec(&json!({
826 "error": "test error",
827 }))
828 .unwrap();
829 Ok(AsyncResponse::builder()
830 .status_code(StatusCode::BAD_REQUEST)
831 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
832 .body(AsyncResponseBody::from_bytes(resp_body))
833 .build())
834 } else {
835 unreachable!()
836 }
837 })
838 }
839 }
840
841 let resuming_files_dir = TempfileBuilder::new().tempdir()?;
842 let file_path = spawn_task(async { random_file_path(BLOCK_SIZE) }).await?;
843
844 {
845 let caller = Arc::new(FakeHttpCaller::default());
846 {
847 let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new(
848 get_upload_manager(caller.to_owned()),
849 FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
850 ));
851 let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
852 let params = ObjectParams::builder()
853 .region_provider(single_up_domain_region())
854 .build();
855 uploader.async_upload(file_source, params).await.unwrap_err();
856 }
857 let caller = Arc::try_unwrap(caller).unwrap();
858 assert_eq!(caller.init_parts_counts.into_inner(), 1);
859 assert_eq!(caller.upload_part_counts.into_inner(), 1);
860 }
861
862 #[derive(Debug, Default)]
863 struct FakeHttpCaller2 {
864 init_parts_counts: AtomicUsize,
865 upload_part_counts: AtomicUsize,
866 }
867
868 impl HttpCaller for FakeHttpCaller2 {
869 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
870 unreachable!()
871 }
872
873 #[cfg(feature = "async")]
874 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
875 Box::pin(async move {
876 if request.url().path() == "/buckets/fakebucket/objects/~/uploads" {
877 assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 0);
878 assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 0);
879 let resp_body = json_to_vec(&json!({
880 "uploadId": "fakeuploadid",
881 "expireAt": (SystemTime::now() + Duration::from_secs(5)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
882 }))
883 .unwrap();
884 Ok(AsyncResponse::builder()
885 .status_code(StatusCode::OK)
886 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
887 .body(AsyncResponseBody::from_bytes(resp_body))
888 .build())
889 } else if request
890 .url()
891 .path()
892 .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid/")
893 {
894 let page_number: usize;
895 scan_text!(request.url().path().bytes() => "/buckets/fakebucket/objects/~/uploads/fakeuploadid/{}", page_number);
896 assert_eq!(page_number, 1);
897 let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
898 assert_eq!(body_len, BLOCK_SIZE);
899 assert_eq!(self.upload_part_counts.fetch_add(1, Ordering::Relaxed), 0);
900 let resp_body = json_to_vec(&json!({
901 "etag": format!("==={page_number}==="),
902 "md5": "fake-md5",
903 }))
904 .unwrap();
905 Ok(AsyncResponse::builder()
906 .status_code(StatusCode::OK)
907 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
908 .body(AsyncResponseBody::from_bytes(resp_body))
909 .build())
910 } else if request
911 .url()
912 .path()
913 .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid")
914 {
915 let resp_body = json_to_vec(&json!({
916 "error": "no such uploadId",
917 }))
918 .unwrap();
919 Ok(AsyncResponse::builder()
920 .status_code(StatusCode::from_u16(612).unwrap())
921 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
922 .body(AsyncResponseBody::from_bytes(resp_body))
923 .build())
924 } else {
925 unreachable!()
926 }
927 })
928 }
929 }
930
931 {
932 let caller = Arc::new(FakeHttpCaller2::default());
933 {
934 let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new(
935 get_upload_manager(caller.to_owned()),
936 FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
937 ));
938 let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
939 let params = ObjectParams::builder()
940 .region_provider(single_up_domain_region())
941 .build();
942 uploader.async_upload(file_source, params).await.unwrap_err();
943 }
944 let caller = Arc::try_unwrap(caller).unwrap();
945 assert_eq!(caller.init_parts_counts.into_inner(), 1);
946 assert_eq!(caller.upload_part_counts.into_inner(), 1);
947 }
948
949 sleep(Duration::from_secs(5)).await;
950
951 #[derive(Debug, Default)]
952 struct FakeHttpCaller3 {
953 init_parts_counts: AtomicUsize,
954 upload_part_counts: AtomicUsize,
955 }
956
957 impl HttpCaller for FakeHttpCaller3 {
958 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
959 unreachable!()
960 }
961
962 #[cfg(feature = "async")]
963 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
964 Box::pin(async move {
965 if request.url().path() == "/buckets/fakebucket/objects/~/uploads" {
966 assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 0);
967 assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 0);
968 let resp_body = json_to_vec(&json!({
969 "uploadId": "fakeuploadid",
970 "expireAt": (SystemTime::now() + Duration::from_secs(5)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
971 }))
972 .unwrap();
973 Ok(AsyncResponse::builder()
974 .status_code(StatusCode::OK)
975 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
976 .body(AsyncResponseBody::from_bytes(resp_body))
977 .build())
978 } else if request
979 .url()
980 .path()
981 .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid/")
982 {
983 let page_number: usize;
984 scan_text!(request.url().path().bytes() => "/buckets/fakebucket/objects/~/uploads/fakeuploadid/{}", page_number);
985 assert_eq!(page_number, 1);
986 let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
987 assert_eq!(body_len, BLOCK_SIZE);
988 assert_eq!(self.upload_part_counts.fetch_add(1, Ordering::Relaxed), 0);
989 let resp_body = json_to_vec(&json!({
990 "etag": format!("==={page_number}==="),
991 "md5": "fake-md5",
992 }))
993 .unwrap();
994 Ok(AsyncResponse::builder()
995 .status_code(StatusCode::OK)
996 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
997 .body(AsyncResponseBody::from_bytes(resp_body))
998 .build())
999 } else if request
1000 .url()
1001 .path()
1002 .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid")
1003 {
1004 let resp_body = json_to_vec(&json!({
1005 "ok": 1,
1006 }))
1007 .unwrap();
1008 Ok(AsyncResponse::builder()
1009 .status_code(StatusCode::OK)
1010 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1011 .body(AsyncResponseBody::from_bytes(resp_body))
1012 .build())
1013 } else {
1014 unreachable!()
1015 }
1016 })
1017 }
1018 }
1019
1020 {
1021 let caller = Arc::new(FakeHttpCaller3::default());
1022 {
1023 let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new(
1024 get_upload_manager(caller.to_owned()),
1025 FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
1026 ));
1027 let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
1028 let params = ObjectParams::builder()
1029 .region_provider(single_up_domain_region())
1030 .build();
1031 let body = uploader.async_upload(file_source, params).await.unwrap();
1032 assert_eq!(body.get("ok").unwrap().as_i64(), Some(1));
1033 }
1034 let caller = Arc::try_unwrap(caller).unwrap();
1035 assert_eq!(caller.init_parts_counts.into_inner(), 1);
1036 assert_eq!(caller.upload_part_counts.into_inner(), 1);
1037 }
1038
1039 #[derive(Debug, Default)]
1040 struct FakeHttpCaller4 {
1041 init_parts_counts: AtomicUsize,
1042 upload_part_counts: AtomicUsize,
1043 complete_parts_counts: AtomicUsize,
1044 }
1045
1046 impl HttpCaller for FakeHttpCaller4 {
1047 fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
1048 unreachable!()
1049 }
1050
1051 #[cfg(feature = "async")]
1052 fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
1053 Box::pin(async move {
1054 if request.url().path() == "/buckets/fakebucket/objects/~/uploads" {
1055 if request.url().host() == Some("fakeup.example.com") {
1056 assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 0);
1057 assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 0);
1058 assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 0);
1059 } else {
1060 assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 1);
1061 assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 1);
1062 assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 1);
1063 }
1064 let resp_body = json_to_vec(&json!({
1065 "uploadId": "fakeuploadid",
1066 "expireAt": (SystemTime::now() + Duration::from_secs(5)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
1067 }))
1068 .unwrap();
1069 Ok(AsyncResponse::builder()
1070 .status_code(StatusCode::OK)
1071 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1072 .body(AsyncResponseBody::from_bytes(resp_body))
1073 .build())
1074 } else if request
1075 .url()
1076 .path()
1077 .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid/")
1078 {
1079 let page_number: usize;
1080 scan_text!(request.url().path().bytes() => "/buckets/fakebucket/objects/~/uploads/fakeuploadid/{}", page_number);
1081 assert_eq!(page_number, 1);
1082 let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
1083 assert_eq!(body_len, BLOCK_SIZE);
1084 if request.url().host() == Some("fakeup.example.com") {
1085 assert_eq!(self.init_parts_counts.load(Ordering::Relaxed), 1);
1086 assert_eq!(self.upload_part_counts.fetch_add(1, Ordering::Relaxed), 0);
1087 assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 0);
1088 } else {
1089 assert_eq!(self.init_parts_counts.load(Ordering::Relaxed), 2);
1090 assert_eq!(self.upload_part_counts.fetch_add(1, Ordering::Relaxed), 1);
1091 assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 1);
1092 }
1093 let resp_body = json_to_vec(&json!({
1094 "etag": format!("==={page_number}==="),
1095 "md5": "fake-md5",
1096 }))
1097 .unwrap();
1098 Ok(AsyncResponse::builder()
1099 .status_code(StatusCode::OK)
1100 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1101 .body(AsyncResponseBody::from_bytes(resp_body))
1102 .build())
1103 } else if request
1104 .url()
1105 .path()
1106 .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid")
1107 {
1108 if request.url().host() == Some("fakeup.example.com") {
1109 assert_eq!(self.init_parts_counts.load(Ordering::Relaxed), 1);
1110 assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 1);
1111 assert_eq!(self.complete_parts_counts.fetch_add(1, Ordering::Relaxed), 0);
1112 let resp_body = json_to_vec(&json!({
1113 "error": "test error",
1114 }))
1115 .unwrap();
1116 Ok(AsyncResponse::builder()
1117 .status_code(StatusCode::from_u16(599).unwrap())
1118 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1119 .body(AsyncResponseBody::from_bytes(resp_body))
1120 .build())
1121 } else {
1122 assert_eq!(self.init_parts_counts.load(Ordering::Relaxed), 2);
1123 assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 2);
1124 assert_eq!(self.complete_parts_counts.fetch_add(1, Ordering::Relaxed), 1);
1125 let resp_body = json_to_vec(&json!({
1126 "ok": 1,
1127 }))
1128 .unwrap();
1129 Ok(AsyncResponse::builder()
1130 .status_code(StatusCode::OK)
1131 .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1132 .body(AsyncResponseBody::from_bytes(resp_body))
1133 .build())
1134 }
1135 } else {
1136 unreachable!()
1137 }
1138 })
1139 }
1140 }
1141
1142 {
1143 let caller = Arc::new(FakeHttpCaller4::default());
1144 {
1145 let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new(
1146 get_upload_manager_with_retrier(caller.to_owned(), LimitedRetrier::new(ErrorRetrier, 0)),
1147 FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
1148 ));
1149 let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
1150 let params = ObjectParams::builder()
1151 .region_provider(double_up_domain_region())
1152 .build();
1153 let body = uploader.async_upload(file_source, params).await.unwrap();
1154 assert_eq!(body.get("ok").unwrap().as_i64(), Some(1));
1155 }
1156 let caller = Arc::try_unwrap(caller).unwrap();
1157 assert_eq!(caller.init_parts_counts.into_inner(), 2);
1158 assert_eq!(caller.upload_part_counts.into_inner(), 2);
1159 assert_eq!(caller.complete_parts_counts.into_inner(), 2);
1160 }
1161
1162 Ok(())
1163 }
1164
1165 async fn size_of_async_reader<R: AsyncRead + AsyncReset + Unpin>(mut reader: &mut R) -> IoResult<u64> {
1166 let size = async_io_copy(&mut reader, &mut async_io_sink()).await?;
1167 reader.reset().await?;
1168 Ok(size)
1169 }
1170
1171 async fn sha1_of_async_reader<R: AsyncRead + AsyncReset + Unpin + Send>(reader: &mut R) -> IoResult<String> {
1172 Ok(urlsafe_base64(
1173 AsyncDigestible::<Sha1>::digest(reader).await?.as_slice(),
1174 ))
1175 }
1176
1177 fn get_upload_manager(caller: impl HttpCaller + 'static) -> UploadManager {
1178 get_upload_manager_with_retrier(caller, NeverRetrier)
1179 }
1180
1181 fn get_upload_manager_with_retrier(
1182 caller: impl HttpCaller + 'static,
1183 retrier: impl RequestRetrier + 'static,
1184 ) -> UploadManager {
1185 UploadManager::builder(UploadTokenSigner::new_credential_provider(
1186 get_credential(),
1187 "fakebucket",
1188 Duration::from_secs(100),
1189 ))
1190 .http_client(
1191 HttpClient::builder(caller)
1192 .chooser(DirectChooser)
1193 .request_retrier(retrier)
1194 .backoff(NO_BACKOFF)
1195 .build(),
1196 )
1197 .build()
1198 }
1199
1200 fn get_credential() -> Credential {
1201 Credential::new("fakeaccesskey", "fakesecretkey")
1202 }
1203
1204 fn single_up_domain_region() -> Region {
1205 Region::builder("chaotic")
1206 .add_up_preferred_endpoint(("fakeup.example.com".to_owned(), 8080).into())
1207 .build()
1208 }
1209
1210 fn double_up_domain_region() -> StaticRegionsProvider {
1211 let mut provider = StaticRegionsProvider::new(single_up_domain_region());
1212 provider.append(
1213 Region::builder("chaotic2")
1214 .add_up_preferred_endpoint(("fakeup.example2.com".to_owned(), 8080).into())
1215 .build(),
1216 );
1217 provider
1218 }
1219
1220 fn random_file_path(size: u64) -> IoResult<TempPath> {
1221 let mut tempfile = TempfileBuilder::new().tempfile()?;
1222 let rng = Box::new(thread_rng()) as Box<dyn RngCore>;
1223 io_copy(&mut rng.take(size), &mut tempfile)?;
1224 Ok(tempfile.into_temp_path())
1225 }
1226}