qiniu_upload_manager/scheduler/
serial_multi_parts_uploader_scheduler.rs

1use super::{
2    super::{
3        multi_parts_uploader::{MultiPartsUploaderExt, PartsExpiredError},
4        ConcurrencyProvider, DataPartitionProvider, DataSource, FixedDataPartitionProvider, MultiPartsUploader,
5        ObjectParams, ReinitializeOptions, UploadedPart,
6    },
7    utils::{
8        keep_original_region_options, need_to_retry, no_region_tried_error, remove_used_region_from_regions,
9        specify_region_options, UploadPartsError, UploadResumedPartsError,
10    },
11    MultiPartsUploaderScheduler,
12};
13use qiniu_apis::http_client::{ApiResult, ResponseError};
14use serde_json::Value;
15use std::num::NonZeroU64;
16
17#[cfg(feature = "async")]
18use {
19    super::AsyncDataSource,
20    futures::future::{BoxFuture, OptionFuture},
21};
22
23/// 串行分片上传调度器
24///
25/// 不启动任何线程,仅在本地串行上传分片。
26///
27/// ### 用串行分片上传调度器上传文件
28///
29/// ###### 阻塞代码示例
30///
31/// ```
32/// use qiniu_upload_manager::{
33///     apis::credential::Credential, prelude::*, FileSystemResumableRecorder, MultiPartsV2Uploader,
34///     ObjectParams, SerialMultiPartsUploaderScheduler, UploadManager, UploadTokenSigner,
35/// };
36/// use std::time::Duration;
37/// use sha1::Sha1;
38///
39/// # fn example() -> anyhow::Result<()> {
40/// let bucket_name = "test-bucket";
41/// let object_name = "test-object";
42/// let upload_manager = UploadManager::builder(UploadTokenSigner::new_credential_provider(
43///     Credential::new("abcdefghklmnopq", "1234567890"),
44///     bucket_name,
45///     Duration::from_secs(3600),
46/// ))
47/// .build();
48/// let params = ObjectParams::builder().object_name(object_name).file_name(object_name).build();
49/// let mut scheduler = SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new(
50///     upload_manager,
51///     FileSystemResumableRecorder::<Sha1>::default(),
52/// ));
53/// scheduler.upload_path("/home/qiniu/test.png", params)?;
54/// # Ok(())
55/// # }
56/// ```
57///
58/// ###### 异步代码示例
59///
60/// ```
61/// use qiniu_upload_manager::{
62///     apis::credential::Credential, prelude::*, FileSystemResumableRecorder, MultiPartsV2Uploader,
63///     ObjectParams, SerialMultiPartsUploaderScheduler, UploadManager, UploadTokenSigner,
64/// };
65/// use std::time::Duration;
66/// use sha1::Sha1;
67///
68/// # async fn example() -> anyhow::Result<()> {
69/// let bucket_name = "test-bucket";
70/// let object_name = "test-object";
71/// let upload_manager = UploadManager::builder(UploadTokenSigner::new_credential_provider(
72///     Credential::new("abcdefghklmnopq", "1234567890"),
73///     bucket_name,
74///     Duration::from_secs(3600),
75/// ))
76/// .build();
77/// let params = ObjectParams::builder().object_name(object_name).file_name(object_name).build();
78/// let mut scheduler = SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new(
79///     upload_manager,
80///     FileSystemResumableRecorder::<Sha1>::default(),
81/// ));
82/// scheduler.async_upload_path("/home/qiniu/test.png", params).await?;
83/// # Ok(())
84/// # }
85/// ```
86#[derive(Debug, Clone)]
87pub struct SerialMultiPartsUploaderScheduler<M: MultiPartsUploader> {
88    data_partition_provider: Box<dyn DataPartitionProvider>,
89    multi_parts_uploader: M,
90}
91
92impl<M: MultiPartsUploader> SerialMultiPartsUploaderScheduler<M> {
93    /// 创建串行分片上传调度器
94    #[inline]
95    pub fn new(multi_parts_uploader: M) -> Self {
96        Self {
97            data_partition_provider: Box::new(FixedDataPartitionProvider::new_with_non_zero_part_size(
98                #[allow(unsafe_code)]
99                unsafe {
100                    NonZeroU64::new_unchecked(1 << 22)
101                },
102            )),
103            multi_parts_uploader,
104        }
105    }
106
107    /// 获取分片大小提供者
108    #[inline]
109    pub fn data_partition_provider(&self) -> &dyn DataPartitionProvider {
110        &self.data_partition_provider
111    }
112}
113
114impl<M: MultiPartsUploader> MultiPartsUploaderScheduler<M::HashAlgorithm> for SerialMultiPartsUploaderScheduler<M> {
115    fn set_concurrency_provider(&mut self, _concurrency_provider: Box<dyn ConcurrencyProvider>) {}
116
117    fn set_data_partition_provider(&mut self, data_partition_provider: Box<dyn DataPartitionProvider>) {
118        self.data_partition_provider = data_partition_provider;
119    }
120
121    fn upload(&self, source: Box<dyn DataSource<M::HashAlgorithm>>, params: ObjectParams) -> ApiResult<Value> {
122        return match _resume_and_upload(self, source.to_owned(), params.to_owned()) {
123            None => match _try_to_upload_to_all_regions(self, source, params, None) {
124                Ok(None) => Err(no_region_tried_error()),
125                Ok(Some(value)) => Ok(value),
126                Err(err) => Err(err),
127            },
128            Some(Err(UploadPartsError { err, .. })) if !need_to_retry(&err) => Err(err),
129            Some(Err(UploadPartsError { initialized, err })) => {
130                match _try_to_upload_to_all_regions(self, source, params, initialized) {
131                    Ok(None) => Err(err),
132                    Ok(Some(value)) => Ok(value),
133                    Err(err) => Err(err),
134                }
135            }
136            Some(Ok(value)) => Ok(value),
137        };
138
139        fn _resume_and_upload<M: MultiPartsUploader>(
140            scheduler: &SerialMultiPartsUploaderScheduler<M>,
141            source: Box<dyn DataSource<M::HashAlgorithm>>,
142            params: ObjectParams,
143        ) -> Option<Result<Value, UploadPartsError<M::InitializedParts>>> {
144            _upload_resumed_parts(scheduler, source, params).map(|result| match result {
145                Ok(value) => Ok(value),
146                Err(UploadResumedPartsError {
147                    err,
148                    resumed: true,
149                    initialized: Some(mut initialized),
150                }) if err.extensions().get::<PartsExpiredError>().is_some() => {
151                    match _reinitialize_and_upload_again(scheduler, &mut initialized, keep_original_region_options()) {
152                        Some(Ok(value)) => Ok(value),
153                        Some(Err(err)) => Err(UploadPartsError::new(err, Some(initialized))),
154                        None => Err(UploadPartsError::new(err, Some(initialized))),
155                    }
156                }
157                Err(UploadResumedPartsError { err, initialized, .. }) => Err(UploadPartsError::new(err, initialized)),
158            })
159        }
160
161        fn _upload_resumed_parts<M: MultiPartsUploader>(
162            scheduler: &SerialMultiPartsUploaderScheduler<M>,
163            source: Box<dyn DataSource<M::HashAlgorithm>>,
164            params: ObjectParams,
165        ) -> Option<Result<Value, UploadResumedPartsError<M::InitializedParts>>> {
166            scheduler
167                .multi_parts_uploader
168                .try_to_resume_parts(source, params)
169                .map(|initialized| {
170                    _upload_after_initialize(scheduler, &initialized)
171                        .map_err(|(err, resumed)| UploadResumedPartsError::new(err, resumed, Some(initialized)))
172                })
173        }
174
175        fn _try_to_upload_to_all_regions<M: MultiPartsUploader>(
176            scheduler: &SerialMultiPartsUploaderScheduler<M>,
177            source: Box<dyn DataSource<M::HashAlgorithm>>,
178            params: ObjectParams,
179            mut initialized: Option<M::InitializedParts>,
180        ) -> ApiResult<Option<Value>> {
181            let mut regions = scheduler
182                .multi_parts_uploader
183                .get_bucket_regions(&params)
184                .map(|r| r.into_regions())?;
185            if let Some(initialized) = &initialized {
186                remove_used_region_from_regions(&mut regions, initialized);
187            }
188            let mut last_err = None;
189            for region in regions {
190                let initialized_result = if let Some(mut initialized) = initialized.take() {
191                    scheduler
192                        .multi_parts_uploader
193                        .reinitialize_parts(&mut initialized, specify_region_options(region))
194                        .map(|_| initialized)
195                } else {
196                    scheduler
197                        .multi_parts_uploader
198                        .initialize_parts(source.to_owned(), params.to_owned())
199                };
200                let new_initialized = match initialized_result {
201                    Ok(new_initialized) => {
202                        initialized = Some(new_initialized.to_owned());
203                        new_initialized
204                    }
205                    Err(err) => {
206                        let to_retry = need_to_retry(&err);
207                        last_err = Some(err);
208                        if to_retry {
209                            continue;
210                        } else {
211                            break;
212                        }
213                    }
214                };
215                match _upload_after_reinitialize(scheduler, &new_initialized) {
216                    Ok(value) => {
217                        return Ok(Some(value));
218                    }
219                    Err(err) => {
220                        let to_retry = need_to_retry(&err);
221                        last_err = Some(err);
222                        if to_retry {
223                            continue;
224                        } else {
225                            break;
226                        }
227                    }
228                }
229            }
230            last_err.map_or(Ok(None), Err)
231        }
232
233        fn _upload_after_initialize<M: MultiPartsUploader>(
234            scheduler: &SerialMultiPartsUploaderScheduler<M>,
235            initialized: &M::InitializedParts,
236        ) -> Result<Value, (ResponseError, bool)> {
237            let mut parts = Vec::with_capacity(4);
238            let mut resumed = false;
239            loop {
240                match scheduler
241                    .multi_parts_uploader
242                    .upload_part(initialized, &scheduler.data_partition_provider)
243                {
244                    Ok(Some(uploaded_part)) => {
245                        if uploaded_part.resumed() {
246                            resumed = true;
247                        }
248                        parts.push(uploaded_part);
249                    }
250                    Ok(None) => break,
251                    Err(err) => return Err((err, resumed)),
252                }
253            }
254            scheduler
255                .multi_parts_uploader
256                .complete_parts(initialized, &parts)
257                .map_err(|err| (err, resumed))
258        }
259
260        fn _reinitialize_and_upload_again<M: MultiPartsUploader>(
261            scheduler: &SerialMultiPartsUploaderScheduler<M>,
262            initialized: &mut M::InitializedParts,
263            reinitialize_options: ReinitializeOptions,
264        ) -> Option<ApiResult<Value>> {
265            scheduler
266                .multi_parts_uploader
267                .reinitialize_parts(initialized, reinitialize_options)
268                .ok()
269                .map(|_| _upload_after_reinitialize(scheduler, initialized))
270        }
271
272        fn _upload_after_reinitialize<M: MultiPartsUploader>(
273            scheduler: &SerialMultiPartsUploaderScheduler<M>,
274            initialized: &M::InitializedParts,
275        ) -> ApiResult<Value> {
276            let mut parts = Vec::with_capacity(4);
277            while let Some(uploaded_part) = scheduler
278                .multi_parts_uploader
279                .upload_part(initialized, &scheduler.data_partition_provider)?
280            {
281                parts.push(uploaded_part);
282            }
283            scheduler.multi_parts_uploader.complete_parts(initialized, &parts)
284        }
285    }
286
287    #[cfg(feature = "async")]
288    #[cfg_attr(feature = "docs", doc(cfg(feature = "async")))]
289    fn async_upload(
290        &self,
291        source: Box<dyn AsyncDataSource<M::HashAlgorithm>>,
292        params: ObjectParams,
293    ) -> BoxFuture<ApiResult<Value>> {
294        return Box::pin(async move {
295            match _resume_and_upload(self, source.to_owned(), params.to_owned()).await {
296                None => match _try_to_upload_to_all_regions(self, source, params, None).await {
297                    Ok(None) => Err(no_region_tried_error()),
298                    Ok(Some(value)) => Ok(value),
299                    Err(err) => Err(err),
300                },
301                Some(Err(UploadPartsError { err, .. })) if !need_to_retry(&err) => Err(err),
302                Some(Err(UploadPartsError { initialized, err })) => {
303                    match _try_to_upload_to_all_regions(self, source, params, initialized).await {
304                        Ok(None) => Err(err),
305                        Ok(Some(value)) => Ok(value),
306                        Err(err) => Err(err),
307                    }
308                }
309                Some(Ok(value)) => Ok(value),
310            }
311        });
312
313        async fn _resume_and_upload<M: MultiPartsUploader>(
314            scheduler: &SerialMultiPartsUploaderScheduler<M>,
315            source: Box<dyn AsyncDataSource<M::HashAlgorithm>>,
316            params: ObjectParams,
317        ) -> Option<Result<Value, UploadPartsError<M::AsyncInitializedParts>>> {
318            OptionFuture::from(
319                _upload_resumed_parts(scheduler, source, params)
320                    .await
321                    .map(|result| async move {
322                        match result {
323                            Ok(value) => Ok(value),
324                            Err(UploadResumedPartsError {
325                                err,
326                                resumed: true,
327                                initialized: Some(mut initialized),
328                            }) if err.extensions().get::<PartsExpiredError>().is_some() => {
329                                match _reinitialize_and_upload_again(
330                                    scheduler,
331                                    &mut initialized,
332                                    keep_original_region_options(),
333                                )
334                                .await
335                                {
336                                    Some(Ok(value)) => Ok(value),
337                                    Some(Err(err)) => Err(UploadPartsError::new(err, Some(initialized))),
338                                    None => Err(UploadPartsError::new(err, Some(initialized))),
339                                }
340                            }
341                            Err(UploadResumedPartsError { err, initialized, .. }) => {
342                                Err(UploadPartsError::new(err, initialized))
343                            }
344                        }
345                    }),
346            )
347            .await
348        }
349
350        async fn _upload_resumed_parts<M: MultiPartsUploader>(
351            scheduler: &SerialMultiPartsUploaderScheduler<M>,
352            source: Box<dyn AsyncDataSource<M::HashAlgorithm>>,
353            params: ObjectParams,
354        ) -> Option<Result<Value, UploadResumedPartsError<M::AsyncInitializedParts>>> {
355            OptionFuture::from(
356                scheduler
357                    .multi_parts_uploader
358                    .try_to_async_resume_parts(source, params)
359                    .await
360                    .map(|initialized| async move {
361                        _upload_after_initialize(scheduler, &initialized)
362                            .await
363                            .map_err(|(err, resumed)| UploadResumedPartsError::new(err, resumed, Some(initialized)))
364                    }),
365            )
366            .await
367        }
368
369        async fn _try_to_upload_to_all_regions<M: MultiPartsUploader>(
370            scheduler: &SerialMultiPartsUploaderScheduler<M>,
371            source: Box<dyn AsyncDataSource<M::HashAlgorithm>>,
372            params: ObjectParams,
373            mut initialized: Option<M::AsyncInitializedParts>,
374        ) -> ApiResult<Option<Value>> {
375            let mut regions = scheduler
376                .multi_parts_uploader
377                .async_get_bucket_regions(&params)
378                .await
379                .map(|r| r.into_regions())?;
380            if let Some(initialized) = &initialized {
381                remove_used_region_from_regions(&mut regions, initialized);
382            }
383            let mut last_err = None;
384            for region in regions {
385                let initialized_result = if let Some(mut initialized) = initialized.take() {
386                    scheduler
387                        .multi_parts_uploader
388                        .async_reinitialize_parts(&mut initialized, specify_region_options(region))
389                        .await
390                        .map(|_| initialized)
391                } else {
392                    scheduler
393                        .multi_parts_uploader
394                        .async_initialize_parts(source.to_owned(), params.to_owned())
395                        .await
396                };
397                let new_initialized = match initialized_result {
398                    Ok(new_initialized) => {
399                        initialized = Some(new_initialized.to_owned());
400                        new_initialized
401                    }
402                    Err(err) => {
403                        let to_retry = need_to_retry(&err);
404                        last_err = Some(err);
405                        if to_retry {
406                            continue;
407                        } else {
408                            break;
409                        }
410                    }
411                };
412                match _upload_after_reinitialize(scheduler, &new_initialized).await {
413                    Ok(value) => {
414                        return Ok(Some(value));
415                    }
416                    Err(err) => {
417                        let to_retry = need_to_retry(&err);
418                        last_err = Some(err);
419                        if to_retry {
420                            continue;
421                        } else {
422                            break;
423                        }
424                    }
425                }
426            }
427            last_err.map_or(Ok(None), Err)
428        }
429
430        async fn _upload_after_initialize<M: MultiPartsUploader>(
431            scheduler: &SerialMultiPartsUploaderScheduler<M>,
432            initialized: &M::AsyncInitializedParts,
433        ) -> Result<Value, (ResponseError, bool)> {
434            let mut parts = Vec::with_capacity(4);
435            let mut resumed = false;
436            loop {
437                match scheduler
438                    .multi_parts_uploader
439                    .async_upload_part(initialized, &scheduler.data_partition_provider)
440                    .await
441                {
442                    Ok(Some(uploaded_part)) => {
443                        if uploaded_part.resumed() {
444                            resumed = true;
445                        }
446                        parts.push(uploaded_part);
447                    }
448                    Ok(None) => break,
449                    Err(err) => return Err((err, resumed)),
450                }
451            }
452            scheduler
453                .multi_parts_uploader
454                .async_complete_parts(initialized, &parts)
455                .await
456                .map_err(|err| (err, resumed))
457        }
458
459        async fn _reinitialize_and_upload_again<M: MultiPartsUploader>(
460            scheduler: &SerialMultiPartsUploaderScheduler<M>,
461            initialized: &mut M::AsyncInitializedParts,
462            reinitialize_options: ReinitializeOptions,
463        ) -> Option<ApiResult<Value>> {
464            OptionFuture::from(
465                scheduler
466                    .multi_parts_uploader
467                    .async_reinitialize_parts(initialized, reinitialize_options)
468                    .await
469                    .ok()
470                    .map(|_| _upload_after_reinitialize(scheduler, initialized)),
471            )
472            .await
473        }
474
475        async fn _upload_after_reinitialize<M: MultiPartsUploader>(
476            scheduler: &SerialMultiPartsUploaderScheduler<M>,
477            initialized: &M::AsyncInitializedParts,
478        ) -> ApiResult<Value> {
479            let mut parts = Vec::with_capacity(4);
480            while let Some(uploaded_part) = scheduler
481                .multi_parts_uploader
482                .async_upload_part(initialized, &scheduler.data_partition_provider)
483                .await?
484            {
485                parts.push(uploaded_part);
486            }
487            scheduler
488                .multi_parts_uploader
489                .async_complete_parts(initialized, &parts)
490                .await
491        }
492    }
493}
494
495#[cfg(feature = "async")]
496#[cfg(test)]
497mod tests {
498    use super::{
499        super::super::{
500            data_source::AsyncDigestible, AsyncFileDataSource, FileSystemResumableRecorder, MultiPartsV1Uploader,
501            MultiPartsV2Uploader, UploadManager, UploadTokenSigner,
502        },
503        *,
504    };
505    use anyhow::Result as AnyResult;
506    use async_std::task::{sleep, spawn as spawn_task};
507    use futures::{
508        io::{copy as async_io_copy, sink as async_io_sink},
509        AsyncRead,
510    };
511    use qiniu_apis::{
512        credential::Credential,
513        http::{
514            AsyncRequest, AsyncReset, AsyncResponse, AsyncResponseResult, HeaderValue, HttpCaller, StatusCode,
515            SyncRequest, SyncResponseResult,
516        },
517        http_client::{
518            AsyncResponseBody, DirectChooser, ErrorRetrier, HttpClient, LimitedRetrier, NeverRetrier, Region,
519            RequestRetrier, StaticRegionsProvider, NO_BACKOFF,
520        },
521    };
522    use qiniu_utils::base64::urlsafe as urlsafe_base64;
523    use rand::{thread_rng, RngCore};
524    use serde_json::{json, to_vec as json_to_vec};
525    use sha1::Sha1;
526    use std::{
527        io::{copy as io_copy, Read, Result as IoResult},
528        sync::{
529            atomic::{AtomicUsize, Ordering},
530            Arc,
531        },
532        time::{Duration, SystemTime, UNIX_EPOCH},
533    };
534    use tempfile::{Builder as TempfileBuilder, TempPath};
535    use text_io::scan as scan_text;
536
537    const BLOCK_SIZE: u64 = 4 << 20;
538
539    #[async_std::test]
540    async fn test_serial_multi_parts_uploader_scheduler_with_async_multi_parts_v1_upload_with_recovery() -> AnyResult<()>
541    {
542        env_logger::builder().is_test(true).try_init().ok();
543
544        #[derive(Debug, Default)]
545        struct FakeHttpCaller {
546            mkblk_counts: AtomicUsize,
547        }
548
549        impl HttpCaller for FakeHttpCaller {
550            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
551                unreachable!()
552            }
553
554            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
555                Box::pin(async move {
556                    if request.url().path().starts_with("/mkblk/") {
557                        let blk_size: u64;
558                        scan_text!(request.url().path().bytes() => "/mkblk/{}", blk_size);
559
560                        match blk_size {
561                            BLOCK_SIZE => {
562                                assert_eq!(self.mkblk_counts.fetch_add(1, Ordering::Relaxed), 0);
563                            }
564                            _ => unreachable!(),
565                        }
566                        let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
567                        assert_eq!(body_len, blk_size);
568                        let resp_body = json_to_vec(&json!({
569                            "ctx": "===0===",
570                            "checksum": sha1_of_async_reader(request.body_mut()).await.unwrap(),
571                            "offset": blk_size,
572                            "host": "http://fakeexample.com",
573                            "expired_at": (SystemTime::now()+Duration::from_secs(3600)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
574                        }))
575                        .unwrap();
576                        Ok(AsyncResponse::builder()
577                            .status_code(StatusCode::OK)
578                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
579                            .body(AsyncResponseBody::from_bytes(resp_body))
580                            .build())
581                    } else if request.url().path().starts_with("/mkfile/") {
582                        let resp_body = json_to_vec(&json!({
583                            "error": "test error",
584                        }))
585                        .unwrap();
586                        Ok(AsyncResponse::builder()
587                            .status_code(StatusCode::BAD_REQUEST)
588                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
589                            .body(AsyncResponseBody::from_bytes(resp_body))
590                            .build())
591                    } else {
592                        unreachable!()
593                    }
594                })
595            }
596        }
597
598        let resuming_files_dir = TempfileBuilder::new().tempdir()?;
599        let file_path = spawn_task(async { random_file_path(BLOCK_SIZE) }).await?;
600
601        {
602            let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV1Uploader::new(
603                get_upload_manager(FakeHttpCaller::default()),
604                FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
605            ));
606            let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
607            let params = ObjectParams::builder()
608                .region_provider(single_up_domain_region())
609                .build();
610            uploader.async_upload(file_source, params).await.unwrap_err();
611        }
612
613        #[derive(Debug, Default)]
614        struct FakeHttpCaller2 {
615            mkblk_counts: AtomicUsize,
616            mkfile_counts: AtomicUsize,
617        }
618
619        impl HttpCaller for FakeHttpCaller2 {
620            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
621                unreachable!()
622            }
623
624            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
625                Box::pin(async move {
626                    if request.url().path().starts_with("/mkblk/") {
627                        let blk_size: u64;
628                        scan_text!(request.url().path().bytes() => "/mkblk/{}", blk_size);
629
630                        match blk_size {
631                            BLOCK_SIZE => {
632                                assert_eq!(self.mkblk_counts.fetch_add(1, Ordering::Relaxed), 0);
633                            }
634                            _ => unreachable!(),
635                        }
636                        let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
637                        assert_eq!(body_len, blk_size);
638                        let resp_body = json_to_vec(&json!({
639                            "ctx": "===0===",
640                            "checksum": sha1_of_async_reader(request.body_mut()).await.unwrap(),
641                            "offset": blk_size,
642                            "host": "http://fakeexample.com",
643                            "expired_at": (SystemTime::now()+Duration::from_secs(5)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
644                        }))
645                        .unwrap();
646                        Ok(AsyncResponse::builder()
647                            .status_code(StatusCode::OK)
648                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
649                            .body(AsyncResponseBody::from_bytes(resp_body))
650                            .build())
651                    } else if request.url().path().starts_with("/mkfile/") {
652                        assert!(self.mkfile_counts.fetch_add(1, Ordering::Relaxed) < 2);
653                        let resp_body = json_to_vec(&json!({
654                            "error": "invalid ctx",
655                        }))
656                        .unwrap();
657                        Ok(AsyncResponse::builder()
658                            .status_code(StatusCode::from_u16(701).unwrap())
659                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
660                            .body(AsyncResponseBody::from_bytes(resp_body))
661                            .build())
662                    } else {
663                        unreachable!()
664                    }
665                })
666            }
667        }
668
669        {
670            let caller = Arc::new(FakeHttpCaller2::default());
671            {
672                let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV1Uploader::new(
673                    get_upload_manager(caller.to_owned()),
674                    FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
675                ));
676                let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
677                let params = ObjectParams::builder()
678                    .region_provider(single_up_domain_region())
679                    .build();
680                uploader.async_upload(file_source, params).await.unwrap_err();
681            }
682            let caller = Arc::try_unwrap(caller).unwrap();
683            assert_eq!(caller.mkblk_counts.into_inner(), 1);
684            assert_eq!(caller.mkfile_counts.into_inner(), 2);
685        }
686
687        sleep(Duration::from_secs(5)).await;
688
689        #[derive(Debug, Default)]
690        struct FakeHttpCaller3 {
691            mkblk_counts: AtomicUsize,
692            mkfile_counts: AtomicUsize,
693        }
694
695        impl HttpCaller for FakeHttpCaller3 {
696            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
697                unreachable!()
698            }
699
700            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
701                Box::pin(async move {
702                    if request.url().path().starts_with("/mkblk/") {
703                        let blk_size: u64;
704                        scan_text!(request.url().path().bytes() => "/mkblk/{}", blk_size);
705
706                        match blk_size {
707                            BLOCK_SIZE => {
708                                assert_eq!(self.mkblk_counts.fetch_add(1, Ordering::Relaxed), 0);
709                            }
710                            _ => unreachable!(),
711                        }
712                        let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
713                        assert_eq!(body_len, blk_size);
714                        let resp_body = json_to_vec(&json!({
715                            "ctx": "===0===",
716                            "checksum": sha1_of_async_reader(request.body_mut()).await.unwrap(),
717                            "offset": blk_size,
718                            "host": "http://fakeexample.com",
719                            "expired_at": (SystemTime::now()+Duration::from_secs(3600)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
720                        }))
721                        .unwrap();
722                        Ok(AsyncResponse::builder()
723                            .status_code(StatusCode::OK)
724                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
725                            .body(AsyncResponseBody::from_bytes(resp_body))
726                            .build())
727                    } else if request.url().path().starts_with("/mkfile/") {
728                        assert!(self.mkfile_counts.fetch_add(1, Ordering::Relaxed) < 2);
729                        let resp_body = json_to_vec(&json!({
730                            "ok": 1,
731                        }))
732                        .unwrap();
733                        Ok(AsyncResponse::builder()
734                            .status_code(StatusCode::OK)
735                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
736                            .body(AsyncResponseBody::from_bytes(resp_body))
737                            .build())
738                    } else {
739                        unreachable!()
740                    }
741                })
742            }
743        }
744
745        {
746            let caller = Arc::new(FakeHttpCaller3::default());
747            {
748                let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV1Uploader::new(
749                    get_upload_manager(caller.to_owned()),
750                    FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
751                ));
752                let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
753                let params = ObjectParams::builder()
754                    .region_provider(single_up_domain_region())
755                    .build();
756                let body = uploader.async_upload(file_source, params).await.unwrap();
757                assert_eq!(body.get("ok").unwrap().as_i64(), Some(1));
758            }
759            let caller = Arc::try_unwrap(caller).unwrap();
760            assert_eq!(caller.mkblk_counts.into_inner(), 1);
761            assert_eq!(caller.mkfile_counts.into_inner(), 1);
762        }
763
764        Ok(())
765    }
766
767    #[async_std::test]
768    async fn test_serial_multi_parts_uploader_scheduler_with_async_multi_parts_v2_upload_with_recovery() -> AnyResult<()>
769    {
770        env_logger::builder().is_test(true).try_init().ok();
771
772        #[derive(Debug, Default)]
773        struct FakeHttpCaller {
774            init_parts_counts: AtomicUsize,
775            upload_part_counts: AtomicUsize,
776        }
777
778        impl HttpCaller for FakeHttpCaller {
779            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
780                unreachable!()
781            }
782
783            #[cfg(feature = "async")]
784            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
785                Box::pin(async move {
786                    if request.url().path() == "/buckets/fakebucket/objects/~/uploads" {
787                        assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 0);
788                        assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 0);
789                        let resp_body = json_to_vec(&json!({
790                                "uploadId": "fakeuploadid",
791                                "expireAt": (SystemTime::now() + Duration::from_secs(3600)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
792                            }))
793                            .unwrap();
794                        Ok(AsyncResponse::builder()
795                            .status_code(StatusCode::OK)
796                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
797                            .body(AsyncResponseBody::from_bytes(resp_body))
798                            .build())
799                    } else if request
800                        .url()
801                        .path()
802                        .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid/")
803                    {
804                        let page_number: usize;
805                        scan_text!(request.url().path().bytes() => "/buckets/fakebucket/objects/~/uploads/fakeuploadid/{}", page_number);
806                        assert_eq!(page_number, 1);
807                        let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
808                        assert_eq!(body_len, BLOCK_SIZE);
809                        assert_eq!(self.upload_part_counts.fetch_add(1, Ordering::Relaxed), 0);
810                        let resp_body = json_to_vec(&json!({
811                            "etag": format!("==={page_number}==="),
812                            "md5": "fake-md5",
813                        }))
814                        .unwrap();
815                        Ok(AsyncResponse::builder()
816                            .status_code(StatusCode::OK)
817                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
818                            .body(AsyncResponseBody::from_bytes(resp_body))
819                            .build())
820                    } else if request
821                        .url()
822                        .path()
823                        .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid")
824                    {
825                        let resp_body = json_to_vec(&json!({
826                            "error": "test error",
827                        }))
828                        .unwrap();
829                        Ok(AsyncResponse::builder()
830                            .status_code(StatusCode::BAD_REQUEST)
831                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
832                            .body(AsyncResponseBody::from_bytes(resp_body))
833                            .build())
834                    } else {
835                        unreachable!()
836                    }
837                })
838            }
839        }
840
841        let resuming_files_dir = TempfileBuilder::new().tempdir()?;
842        let file_path = spawn_task(async { random_file_path(BLOCK_SIZE) }).await?;
843
844        {
845            let caller = Arc::new(FakeHttpCaller::default());
846            {
847                let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new(
848                    get_upload_manager(caller.to_owned()),
849                    FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
850                ));
851                let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
852                let params = ObjectParams::builder()
853                    .region_provider(single_up_domain_region())
854                    .build();
855                uploader.async_upload(file_source, params).await.unwrap_err();
856            }
857            let caller = Arc::try_unwrap(caller).unwrap();
858            assert_eq!(caller.init_parts_counts.into_inner(), 1);
859            assert_eq!(caller.upload_part_counts.into_inner(), 1);
860        }
861
862        #[derive(Debug, Default)]
863        struct FakeHttpCaller2 {
864            init_parts_counts: AtomicUsize,
865            upload_part_counts: AtomicUsize,
866        }
867
868        impl HttpCaller for FakeHttpCaller2 {
869            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
870                unreachable!()
871            }
872
873            #[cfg(feature = "async")]
874            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
875                Box::pin(async move {
876                    if request.url().path() == "/buckets/fakebucket/objects/~/uploads" {
877                        assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 0);
878                        assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 0);
879                        let resp_body = json_to_vec(&json!({
880                                "uploadId": "fakeuploadid",
881                                "expireAt": (SystemTime::now() + Duration::from_secs(5)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
882                            }))
883                            .unwrap();
884                        Ok(AsyncResponse::builder()
885                            .status_code(StatusCode::OK)
886                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
887                            .body(AsyncResponseBody::from_bytes(resp_body))
888                            .build())
889                    } else if request
890                        .url()
891                        .path()
892                        .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid/")
893                    {
894                        let page_number: usize;
895                        scan_text!(request.url().path().bytes() => "/buckets/fakebucket/objects/~/uploads/fakeuploadid/{}", page_number);
896                        assert_eq!(page_number, 1);
897                        let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
898                        assert_eq!(body_len, BLOCK_SIZE);
899                        assert_eq!(self.upload_part_counts.fetch_add(1, Ordering::Relaxed), 0);
900                        let resp_body = json_to_vec(&json!({
901                            "etag": format!("==={page_number}==="),
902                            "md5": "fake-md5",
903                        }))
904                        .unwrap();
905                        Ok(AsyncResponse::builder()
906                            .status_code(StatusCode::OK)
907                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
908                            .body(AsyncResponseBody::from_bytes(resp_body))
909                            .build())
910                    } else if request
911                        .url()
912                        .path()
913                        .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid")
914                    {
915                        let resp_body = json_to_vec(&json!({
916                            "error": "no such uploadId",
917                        }))
918                        .unwrap();
919                        Ok(AsyncResponse::builder()
920                            .status_code(StatusCode::from_u16(612).unwrap())
921                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
922                            .body(AsyncResponseBody::from_bytes(resp_body))
923                            .build())
924                    } else {
925                        unreachable!()
926                    }
927                })
928            }
929        }
930
931        {
932            let caller = Arc::new(FakeHttpCaller2::default());
933            {
934                let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new(
935                    get_upload_manager(caller.to_owned()),
936                    FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
937                ));
938                let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
939                let params = ObjectParams::builder()
940                    .region_provider(single_up_domain_region())
941                    .build();
942                uploader.async_upload(file_source, params).await.unwrap_err();
943            }
944            let caller = Arc::try_unwrap(caller).unwrap();
945            assert_eq!(caller.init_parts_counts.into_inner(), 1);
946            assert_eq!(caller.upload_part_counts.into_inner(), 1);
947        }
948
949        sleep(Duration::from_secs(5)).await;
950
951        #[derive(Debug, Default)]
952        struct FakeHttpCaller3 {
953            init_parts_counts: AtomicUsize,
954            upload_part_counts: AtomicUsize,
955        }
956
957        impl HttpCaller for FakeHttpCaller3 {
958            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
959                unreachable!()
960            }
961
962            #[cfg(feature = "async")]
963            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
964                Box::pin(async move {
965                    if request.url().path() == "/buckets/fakebucket/objects/~/uploads" {
966                        assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 0);
967                        assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 0);
968                        let resp_body = json_to_vec(&json!({
969                                "uploadId": "fakeuploadid",
970                                "expireAt": (SystemTime::now() + Duration::from_secs(5)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
971                            }))
972                            .unwrap();
973                        Ok(AsyncResponse::builder()
974                            .status_code(StatusCode::OK)
975                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
976                            .body(AsyncResponseBody::from_bytes(resp_body))
977                            .build())
978                    } else if request
979                        .url()
980                        .path()
981                        .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid/")
982                    {
983                        let page_number: usize;
984                        scan_text!(request.url().path().bytes() => "/buckets/fakebucket/objects/~/uploads/fakeuploadid/{}", page_number);
985                        assert_eq!(page_number, 1);
986                        let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
987                        assert_eq!(body_len, BLOCK_SIZE);
988                        assert_eq!(self.upload_part_counts.fetch_add(1, Ordering::Relaxed), 0);
989                        let resp_body = json_to_vec(&json!({
990                            "etag": format!("==={page_number}==="),
991                            "md5": "fake-md5",
992                        }))
993                        .unwrap();
994                        Ok(AsyncResponse::builder()
995                            .status_code(StatusCode::OK)
996                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
997                            .body(AsyncResponseBody::from_bytes(resp_body))
998                            .build())
999                    } else if request
1000                        .url()
1001                        .path()
1002                        .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid")
1003                    {
1004                        let resp_body = json_to_vec(&json!({
1005                            "ok": 1,
1006                        }))
1007                        .unwrap();
1008                        Ok(AsyncResponse::builder()
1009                            .status_code(StatusCode::OK)
1010                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1011                            .body(AsyncResponseBody::from_bytes(resp_body))
1012                            .build())
1013                    } else {
1014                        unreachable!()
1015                    }
1016                })
1017            }
1018        }
1019
1020        {
1021            let caller = Arc::new(FakeHttpCaller3::default());
1022            {
1023                let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new(
1024                    get_upload_manager(caller.to_owned()),
1025                    FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
1026                ));
1027                let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
1028                let params = ObjectParams::builder()
1029                    .region_provider(single_up_domain_region())
1030                    .build();
1031                let body = uploader.async_upload(file_source, params).await.unwrap();
1032                assert_eq!(body.get("ok").unwrap().as_i64(), Some(1));
1033            }
1034            let caller = Arc::try_unwrap(caller).unwrap();
1035            assert_eq!(caller.init_parts_counts.into_inner(), 1);
1036            assert_eq!(caller.upload_part_counts.into_inner(), 1);
1037        }
1038
1039        #[derive(Debug, Default)]
1040        struct FakeHttpCaller4 {
1041            init_parts_counts: AtomicUsize,
1042            upload_part_counts: AtomicUsize,
1043            complete_parts_counts: AtomicUsize,
1044        }
1045
1046        impl HttpCaller for FakeHttpCaller4 {
1047            fn call(&self, _request: &mut SyncRequest<'_>) -> SyncResponseResult {
1048                unreachable!()
1049            }
1050
1051            #[cfg(feature = "async")]
1052            fn async_call<'a>(&'a self, request: &'a mut AsyncRequest<'_>) -> BoxFuture<'a, AsyncResponseResult> {
1053                Box::pin(async move {
1054                    if request.url().path() == "/buckets/fakebucket/objects/~/uploads" {
1055                        if request.url().host() == Some("fakeup.example.com") {
1056                            assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 0);
1057                            assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 0);
1058                            assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 0);
1059                        } else {
1060                            assert_eq!(self.init_parts_counts.fetch_add(1, Ordering::Relaxed), 1);
1061                            assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 1);
1062                            assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 1);
1063                        }
1064                        let resp_body = json_to_vec(&json!({
1065                                "uploadId": "fakeuploadid",
1066                                "expireAt": (SystemTime::now() + Duration::from_secs(5)).duration_since(UNIX_EPOCH).unwrap().as_secs(),
1067                            }))
1068                            .unwrap();
1069                        Ok(AsyncResponse::builder()
1070                            .status_code(StatusCode::OK)
1071                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1072                            .body(AsyncResponseBody::from_bytes(resp_body))
1073                            .build())
1074                    } else if request
1075                        .url()
1076                        .path()
1077                        .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid/")
1078                    {
1079                        let page_number: usize;
1080                        scan_text!(request.url().path().bytes() => "/buckets/fakebucket/objects/~/uploads/fakeuploadid/{}", page_number);
1081                        assert_eq!(page_number, 1);
1082                        let body_len = size_of_async_reader(request.body_mut()).await.unwrap();
1083                        assert_eq!(body_len, BLOCK_SIZE);
1084                        if request.url().host() == Some("fakeup.example.com") {
1085                            assert_eq!(self.init_parts_counts.load(Ordering::Relaxed), 1);
1086                            assert_eq!(self.upload_part_counts.fetch_add(1, Ordering::Relaxed), 0);
1087                            assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 0);
1088                        } else {
1089                            assert_eq!(self.init_parts_counts.load(Ordering::Relaxed), 2);
1090                            assert_eq!(self.upload_part_counts.fetch_add(1, Ordering::Relaxed), 1);
1091                            assert_eq!(self.complete_parts_counts.load(Ordering::Relaxed), 1);
1092                        }
1093                        let resp_body = json_to_vec(&json!({
1094                            "etag": format!("==={page_number}==="),
1095                            "md5": "fake-md5",
1096                        }))
1097                        .unwrap();
1098                        Ok(AsyncResponse::builder()
1099                            .status_code(StatusCode::OK)
1100                            .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1101                            .body(AsyncResponseBody::from_bytes(resp_body))
1102                            .build())
1103                    } else if request
1104                        .url()
1105                        .path()
1106                        .starts_with("/buckets/fakebucket/objects/~/uploads/fakeuploadid")
1107                    {
1108                        if request.url().host() == Some("fakeup.example.com") {
1109                            assert_eq!(self.init_parts_counts.load(Ordering::Relaxed), 1);
1110                            assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 1);
1111                            assert_eq!(self.complete_parts_counts.fetch_add(1, Ordering::Relaxed), 0);
1112                            let resp_body = json_to_vec(&json!({
1113                                "error": "test error",
1114                            }))
1115                            .unwrap();
1116                            Ok(AsyncResponse::builder()
1117                                .status_code(StatusCode::from_u16(599).unwrap())
1118                                .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1119                                .body(AsyncResponseBody::from_bytes(resp_body))
1120                                .build())
1121                        } else {
1122                            assert_eq!(self.init_parts_counts.load(Ordering::Relaxed), 2);
1123                            assert_eq!(self.upload_part_counts.load(Ordering::Relaxed), 2);
1124                            assert_eq!(self.complete_parts_counts.fetch_add(1, Ordering::Relaxed), 1);
1125                            let resp_body = json_to_vec(&json!({
1126                                "ok": 1,
1127                            }))
1128                            .unwrap();
1129                            Ok(AsyncResponse::builder()
1130                                .status_code(StatusCode::OK)
1131                                .header("x-reqid", HeaderValue::from_static("FakeReqid"))
1132                                .body(AsyncResponseBody::from_bytes(resp_body))
1133                                .build())
1134                        }
1135                    } else {
1136                        unreachable!()
1137                    }
1138                })
1139            }
1140        }
1141
1142        {
1143            let caller = Arc::new(FakeHttpCaller4::default());
1144            {
1145                let uploader = SerialMultiPartsUploaderScheduler::new(MultiPartsV2Uploader::new(
1146                    get_upload_manager_with_retrier(caller.to_owned(), LimitedRetrier::new(ErrorRetrier, 0)),
1147                    FileSystemResumableRecorder::<Sha1>::new(resuming_files_dir.path()),
1148                ));
1149                let file_source = Box::new(AsyncFileDataSource::new(file_path.as_os_str()));
1150                let params = ObjectParams::builder()
1151                    .region_provider(double_up_domain_region())
1152                    .build();
1153                let body = uploader.async_upload(file_source, params).await.unwrap();
1154                assert_eq!(body.get("ok").unwrap().as_i64(), Some(1));
1155            }
1156            let caller = Arc::try_unwrap(caller).unwrap();
1157            assert_eq!(caller.init_parts_counts.into_inner(), 2);
1158            assert_eq!(caller.upload_part_counts.into_inner(), 2);
1159            assert_eq!(caller.complete_parts_counts.into_inner(), 2);
1160        }
1161
1162        Ok(())
1163    }
1164
1165    async fn size_of_async_reader<R: AsyncRead + AsyncReset + Unpin>(mut reader: &mut R) -> IoResult<u64> {
1166        let size = async_io_copy(&mut reader, &mut async_io_sink()).await?;
1167        reader.reset().await?;
1168        Ok(size)
1169    }
1170
1171    async fn sha1_of_async_reader<R: AsyncRead + AsyncReset + Unpin + Send>(reader: &mut R) -> IoResult<String> {
1172        Ok(urlsafe_base64(
1173            AsyncDigestible::<Sha1>::digest(reader).await?.as_slice(),
1174        ))
1175    }
1176
1177    fn get_upload_manager(caller: impl HttpCaller + 'static) -> UploadManager {
1178        get_upload_manager_with_retrier(caller, NeverRetrier)
1179    }
1180
1181    fn get_upload_manager_with_retrier(
1182        caller: impl HttpCaller + 'static,
1183        retrier: impl RequestRetrier + 'static,
1184    ) -> UploadManager {
1185        UploadManager::builder(UploadTokenSigner::new_credential_provider(
1186            get_credential(),
1187            "fakebucket",
1188            Duration::from_secs(100),
1189        ))
1190        .http_client(
1191            HttpClient::builder(caller)
1192                .chooser(DirectChooser)
1193                .request_retrier(retrier)
1194                .backoff(NO_BACKOFF)
1195                .build(),
1196        )
1197        .build()
1198    }
1199
1200    fn get_credential() -> Credential {
1201        Credential::new("fakeaccesskey", "fakesecretkey")
1202    }
1203
1204    fn single_up_domain_region() -> Region {
1205        Region::builder("chaotic")
1206            .add_up_preferred_endpoint(("fakeup.example.com".to_owned(), 8080).into())
1207            .build()
1208    }
1209
1210    fn double_up_domain_region() -> StaticRegionsProvider {
1211        let mut provider = StaticRegionsProvider::new(single_up_domain_region());
1212        provider.append(
1213            Region::builder("chaotic2")
1214                .add_up_preferred_endpoint(("fakeup.example2.com".to_owned(), 8080).into())
1215                .build(),
1216        );
1217        provider
1218    }
1219
1220    fn random_file_path(size: u64) -> IoResult<TempPath> {
1221        let mut tempfile = TempfileBuilder::new().tempfile()?;
1222        let rng = Box::new(thread_rng()) as Box<dyn RngCore>;
1223        io_copy(&mut rng.take(size), &mut tempfile)?;
1224        Ok(tempfile.into_temp_path())
1225    }
1226}