qiniu_upload/
uploader.rs

1use crate::{
2    config::{build_uploader_builder_from_env, on_config_updated},
3    credential::{CredentialProvider, StaticCredentialProvider},
4    error::{HttpCallError, HttpCallResult},
5    host_selector::HostSelector,
6    query::HostsQuerier,
7    reader::{FormUploadSource, UploadSource},
8    upload_apis::{
9        CompletePartInfo, CompletePartsRequest, CompletePartsRequestBody, FormUploadRequest,
10        InitPartsRequest, UploadApiCaller, UploadPartRequest,
11    },
12    upload_policy::UploadPolicy,
13    upload_token::{
14        BucketUploadTokenProvider, ObjectUploadTokenProvider, ParseResult, UploadTokenProvider,
15    },
16};
17use log::{error, info};
18use once_cell::sync::Lazy;
19use positioned_io::{Cursor, Size};
20use reqwest::StatusCode;
21use serde_json::Value as JSONValue;
22use std::{
23    borrow::Cow,
24    collections::HashMap,
25    fs::File,
26    io::{Read, Result as IOResult},
27    mem::take,
28    path::Path,
29    sync::{Arc, RwLock},
30    time::{Duration, Instant},
31};
32use tap::{Tap, TapFallible};
33
34/// 对象上传器
35#[derive(Debug, Clone)]
36pub struct Uploader {
37    inner: Arc<UploaderInner>,
38}
39
40#[derive(Debug)]
41struct UploaderInner {
42    api_caller: UploadApiCaller,
43    bucket_name: String,
44    base_timeout: Duration,
45    credential: Arc<dyn CredentialProvider>,
46    part_size: u64,
47}
48
49/// 上传进度回调函数
50pub type UploadProgressCallback =
51    Box<dyn Fn(&UploadProgressInfo) -> HttpCallResult<()> + Send + Sync + 'static>;
52
53/// 对象上传构建器
54#[derive(Debug)]
55pub struct UploaderBuilder {
56    credential: Arc<dyn CredentialProvider>,
57    access_key: String,
58    bucket: String,
59    up_urls: Vec<String>,
60    uc_urls: Vec<String>,
61    up_tries: usize,
62    up_timeout_multiple_percent: u32,
63    uc_tries: usize,
64    uc_timeout_multiple_percent: u32,
65    part_size: u64,
66    use_https: bool,
67    update_interval: Duration,
68    punish_duration: Duration,
69    base_timeout: Duration,
70    max_punished_times: usize,
71    max_punished_hosts_percent: u8,
72}
73
74impl UploaderBuilder {
75    /// 新建上传构建器
76    #[inline]
77    pub fn new(
78        access_key: impl Into<String>,
79        secret_key: impl Into<String>,
80        bucket: impl Into<String>,
81    ) -> Self {
82        let access_key = access_key.into();
83        Self {
84            credential: Arc::new(StaticCredentialProvider::new(
85                access_key.to_owned(),
86                secret_key.into(),
87            )),
88            bucket: bucket.into(),
89            access_key,
90            up_urls: Default::default(),
91            uc_urls: Default::default(),
92            up_tries: 10,
93            up_timeout_multiple_percent: 1000,
94            uc_tries: 10,
95            uc_timeout_multiple_percent: 100,
96            part_size: 1 << 22,
97            use_https: false,
98            update_interval: Duration::from_secs(60),
99            punish_duration: Duration::from_secs(30 * 60),
100            base_timeout: Duration::from_secs(30),
101            max_punished_times: 5,
102            max_punished_hosts_percent: 50,
103        }
104    }
105
106    /// 设置七牛 UP 服务器 URL 列表
107    #[inline]
108    pub fn up_urls(mut self, up_urls: Vec<String>) -> Self {
109        self.up_urls = up_urls;
110        self
111    }
112
113    /// 设置七牛 UC 服务器 URL 列表
114    #[inline]
115    pub fn uc_urls(mut self, uc_urls: Vec<String>) -> Self {
116        self.uc_urls = uc_urls;
117        self
118    }
119
120    /// 设置对象上传最大尝试次数
121    #[inline]
122    pub fn up_tries(mut self, up_tries: usize) -> Self {
123        self.up_tries = up_tries;
124        self
125    }
126
127    /// 设置上传超时时长倍数百分比
128    #[inline]
129    pub fn up_timeout_multiple(mut self, up_timeout_multiple: u32) -> Self {
130        self.up_timeout_multiple_percent = up_timeout_multiple;
131        self
132    }
133
134    /// 设置 UC 查询的最大尝试次数
135    #[inline]
136    pub fn uc_tries(mut self, uc_tries: usize) -> Self {
137        self.uc_tries = uc_tries;
138        self
139    }
140
141    /// 设置 UC 查询超时时长倍数百分比
142    #[inline]
143    pub fn uc_timeout_multiple(mut self, uc_timeout_multiple: u32) -> Self {
144        self.uc_timeout_multiple_percent = uc_timeout_multiple;
145        self
146    }
147
148    /// 设置是否使用 HTTPS 协议来访问 UP / UC 服务器
149    #[inline]
150    pub fn use_https(mut self, use_https: bool) -> Self {
151        self.use_https = use_https;
152        self
153    }
154
155    /// 设置 UC 查询的频率
156    #[inline]
157    pub fn update_interval(mut self, update_interval: Duration) -> Self {
158        self.update_interval = update_interval;
159        self
160    }
161
162    /// 设置域名访问失败后的惩罚时长
163    #[inline]
164    pub fn punish_duration(mut self, punish_duration: Duration) -> Self {
165        self.punish_duration = punish_duration;
166        self
167    }
168
169    /// 设置域名访问的基础超时时长
170    #[inline]
171    pub fn base_timeout(mut self, base_timeout: Duration) -> Self {
172        self.base_timeout = base_timeout;
173        self
174    }
175
176    /// 配置默认上传分片大小,单位为字节
177    #[inline]
178    pub fn part_size(mut self, part_size: u64) -> Self {
179        self.part_size = part_size;
180        self
181    }
182
183    /// 设置失败域名的最大重试次数
184    ///
185    /// 一旦一个域名的被惩罚次数超过限制,则域名选择器不会选择该域名,除非被惩罚的域名比例超过上限,或惩罚时长超过指定时长
186    #[inline]
187    pub fn max_punished_times(mut self, max_punished_times: usize) -> Self {
188        self.max_punished_times = max_punished_times;
189        self
190    }
191
192    /// 设置被惩罚的域名最大比例
193    ///
194    /// 域名选择器在搜索域名时,一旦被跳过的域名比例大于该值,则下一个域名将被选中,不管该域名是否也被惩罚。一旦该域名成功,则惩罚将立刻被取消
195    #[inline]
196    pub fn max_punished_hosts_percent(mut self, max_punished_hosts_percent: u8) -> Self {
197        self.max_punished_hosts_percent = max_punished_hosts_percent;
198        self
199    }
200
201    /// 构建对象上传器
202    #[inline]
203    pub fn build(self) -> Uploader {
204        let up_querier = if self.uc_urls.is_empty() {
205            None
206        } else {
207            Some(HostsQuerier::new(
208                HostSelector::builder(self.uc_urls)
209                    .update_interval(self.update_interval)
210                    .punish_duration(self.punish_duration)
211                    .max_punished_times(self.max_punished_times)
212                    .max_punished_hosts_percent(self.max_punished_hosts_percent)
213                    .base_timeout(self.base_timeout * self.uc_timeout_multiple_percent / 100)
214                    .build(),
215                self.uc_tries,
216            ))
217        };
218        let up_selector = {
219            let access_key = self.access_key;
220            let bucket = self.bucket.to_owned();
221            let use_https = self.use_https;
222            HostSelector::builder(self.up_urls)
223                .update_callback(Box::new(move || {
224                    if let Some(up_querier) = &up_querier {
225                        up_querier.query_for_up_urls(&access_key, &bucket, use_https)
226                    } else {
227                        Ok(vec![])
228                    }
229                }))
230                .should_punish_callback(Box::new(|err| match err {
231                    HttpCallError::ReqwestError(err) if err.is_builder() => false,
232                    HttpCallError::StatusCodeError(err) => {
233                        !is_client_error_status(err.status_code())
234                    }
235                    _ => true,
236                }))
237                .update_interval(self.update_interval)
238                .punish_duration(self.punish_duration)
239                .max_punished_times(self.max_punished_times)
240                .max_punished_hosts_percent(self.max_punished_hosts_percent)
241                .base_timeout(self.base_timeout * self.up_timeout_multiple_percent / 100)
242                .build()
243        };
244
245        Uploader {
246            inner: Arc::new(UploaderInner {
247                api_caller: UploadApiCaller::new(up_selector, self.up_tries),
248                bucket_name: self.bucket,
249                part_size: self.part_size,
250                base_timeout: self.base_timeout,
251                credential: self.credential,
252            }),
253        }
254    }
255
256    /// 从环境变量创建对象上传构建器
257    #[inline]
258    pub fn from_env() -> Option<Self> {
259        build_uploader_builder_from_env()
260    }
261}
262
263impl Uploader {
264    #[inline]
265    /// 从环境变量创建对象上传器
266    pub fn from_env() -> Option<Self> {
267        static UPLOADER: Lazy<RwLock<Option<Uploader>>> = Lazy::new(|| {
268            RwLock::new(build_uploader()).tap(|_| {
269                on_config_updated(|| {
270                    *UPLOADER.write().unwrap() = build_uploader();
271                    info!("UPLOADER reloaded: {:?}", UPLOADER);
272                })
273            })
274        });
275        return UPLOADER.read().unwrap().as_ref().map(|u| u.to_owned());
276
277        #[inline]
278        fn build_uploader() -> Option<Uploader> {
279            UploaderBuilder::from_env().map(|b| b.build())
280        }
281    }
282
283    /// 创建上传文件请求构建器
284    #[inline]
285    pub fn upload_file(&self, source: File) -> UploadRequestBuilder {
286        UploadRequestBuilder {
287            source,
288            inner: UploadRequestBuilderInner {
289                uploader: self,
290                object_name: None,
291                upload_progress_callback: None,
292                file_name: None,
293                mime_type: None,
294                metadata: None,
295                custom_vars: None,
296            },
297        }
298    }
299
300    /// 创建上传文件请求构建器
301    #[inline]
302    pub fn upload_path(&self, path: impl AsRef<Path>) -> IOResult<UploadRequestBuilder> {
303        let file = File::open(path.as_ref())?;
304        Ok(self.upload_file(file))
305    }
306}
307
308struct UploadRequestBuilderInner<'a> {
309    uploader: &'a Uploader,
310    object_name: Option<String>,
311    file_name: Option<String>,
312    mime_type: Option<String>,
313    metadata: Option<HashMap<String, String>>,
314    custom_vars: Option<HashMap<String, String>>,
315    upload_progress_callback: Option<UploadProgressCallback>,
316}
317
318/// 上传文件请求构建器
319pub struct UploadRequestBuilder<'a> {
320    source: File,
321    inner: UploadRequestBuilderInner<'a>,
322}
323
324impl<'a> UploadRequestBuilder<'a> {
325    /// 设置对象名称
326    #[inline]
327    pub fn object_name(mut self, object_name: impl Into<String>) -> Self {
328        self.inner.object_name = Some(object_name.into());
329        self
330    }
331
332    /// 设置原始文件名
333    #[inline]
334    pub fn file_name(mut self, file_name: impl Into<String>) -> Self {
335        self.inner.file_name = Some(file_name.into());
336        self
337    }
338
339    /// 设置文件 MIME 类型
340    #[inline]
341    pub fn mime_type(mut self, mime_type: impl Into<String>) -> Self {
342        self.inner.mime_type = Some(mime_type.into());
343        self
344    }
345
346    /// 追加自定义元数据
347    pub fn add_metadata(
348        mut self,
349        metadata_key: impl Into<String>,
350        metadata_value: impl Into<String>,
351    ) -> Self {
352        if let Some(metadata) = &mut self.inner.metadata {
353            metadata.insert(metadata_key.into(), metadata_value.into());
354        } else {
355            let mut metadata = HashMap::new();
356            metadata.insert(metadata_key.into(), metadata_value.into());
357            self.inner.metadata = Some(metadata);
358        }
359        self
360    }
361
362    /// 设置自定义元数据
363    #[inline]
364    pub fn metadata(mut self, metadata: HashMap<String, String>) -> Self {
365        self.inner.metadata = Some(metadata);
366        self
367    }
368
369    /// 追加自定义变量
370    pub fn add_custom_var(
371        mut self,
372        custom_var_name: impl Into<String>,
373        custom_var_value: impl Into<String>,
374    ) -> Self {
375        if let Some(custom_vars) = &mut self.inner.custom_vars {
376            custom_vars.insert(custom_var_name.into(), custom_var_value.into());
377        } else {
378            let mut custom_vars = HashMap::new();
379            custom_vars.insert(custom_var_name.into(), custom_var_value.into());
380            self.inner.custom_vars = Some(custom_vars);
381        }
382        self
383    }
384
385    /// 设置自定义变量
386    #[inline]
387    pub fn custom_vars(mut self, custom_vars: HashMap<String, String>) -> Self {
388        self.inner.custom_vars = Some(custom_vars);
389        self
390    }
391
392    /// 设置上传进度回调函数(仅在分片上传时生效)
393    #[inline]
394    pub fn upload_progress_callback(
395        mut self,
396        upload_progress_callback: UploadProgressCallback,
397    ) -> Self {
398        self.inner.upload_progress_callback = Some(upload_progress_callback);
399        self
400    }
401
402    /// 开始上传
403    pub fn start(self) -> HttpCallResult<UploadResult> {
404        let begin_at = Instant::now();
405        let object_name = self.inner.object_name.to_owned();
406        self.start_uploading()
407            .tap_ok(|_| {
408                info!(
409                    "done uploading, object_name {:?}, elapsed {:?}",
410                    object_name,
411                    begin_at.elapsed()
412                );
413            })
414            .tap_err(|err| {
415                error!(
416                    "error uploading, object_name {:?}, err: {:?}, elapsed {:?}",
417                    object_name,
418                    err,
419                    begin_at.elapsed()
420                );
421            })
422    }
423
424    fn start_uploading(self) -> HttpCallResult<UploadResult> {
425        if let Some(total_size) = self.source.size()? {
426            if total_size <= self.inner.uploader.inner.part_size {
427                self.inner.start_form_upload(Arc::new(self.source).into())
428            } else {
429                self.inner
430                    .start_resumable_upload(Arc::new(RwLock::new(self.source)).into())
431            }
432        } else {
433            self.start_uploading_reader()
434        }
435    }
436
437    fn start_uploading_reader(mut self) -> HttpCallResult<UploadResult> {
438        let first_chunk = {
439            let mut chunk_buf = Vec::new();
440            let source = &mut self.source;
441            source
442                .take(self.inner.uploader.inner.part_size + 1)
443                .read_to_end(&mut chunk_buf)?;
444            chunk_buf
445        };
446        if first_chunk.len() as u64 <= self.inner.uploader.inner.part_size {
447            self.inner.start_form_upload(Arc::new(first_chunk).into())
448        } else {
449            self.inner.start_resumable_upload(UploadSource::from_reader(
450                Cursor::new(first_chunk).chain(self.source),
451            ))
452        }
453    }
454}
455
456impl<'a> UploadRequestBuilderInner<'a> {
457    fn start_form_upload(self, source: FormUploadSource) -> HttpCallResult<UploadResult> {
458        let upload_token_provider = self.make_upload_token_provider();
459        let mut form_upload_result =
460            self.uploader
461                .inner
462                .api_caller
463                .form_upload(&FormUploadRequest::new(
464                    &upload_token_provider,
465                    self.object_name.as_deref(),
466                    self.file_name.as_deref(),
467                    self.mime_type.as_deref(),
468                    source,
469                    self.metadata,
470                    self.custom_vars,
471                ))?;
472        Ok(UploadResult {
473            response_body: take(form_upload_result.response_body_mut()),
474        })
475    }
476
477    fn start_resumable_upload(self, upload_source: UploadSource) -> HttpCallResult<UploadResult> {
478        let upload_token_provider = self.make_upload_token_provider();
479        let init_parts_response =
480            self.uploader
481                .inner
482                .api_caller
483                .init_parts(&InitPartsRequest::new(
484                    &upload_token_provider,
485                    self.uploader.inner.bucket_name.as_ref(),
486                    self.object_name.as_deref(),
487                ))?;
488        let mut partitioner = upload_source.part(self.uploader.inner.part_size)?;
489        let mut part_number = 1u32;
490        let mut uploaded = 0u64;
491        let mut completed_parts = Vec::new();
492        while let Some(part_reader) = partitioner.next_part_reader()? {
493            let mut upload_result =
494                self.uploader
495                    .inner
496                    .api_caller
497                    .upload_part(&UploadPartRequest::new(
498                        &upload_token_provider,
499                        self.uploader.inner.bucket_name.as_ref(),
500                        self.object_name.as_deref(),
501                        init_parts_response.response_body().upload_id(),
502                        part_number,
503                        part_reader,
504                    ))?;
505            uploaded = uploaded.saturating_add(upload_result.uploaded());
506            if let Some(upload_progress_callback) = &self.upload_progress_callback {
507                upload_progress_callback(&UploadProgressInfo {
508                    upload_id: init_parts_response.response_body().upload_id(),
509                    uploaded,
510                    part_number,
511                })?;
512            }
513            completed_parts.push(CompletePartInfo::new(
514                take(upload_result.response_body_mut().etag_mut()),
515                part_number,
516            ));
517            part_number = part_number.saturating_add(1);
518        }
519        let mut complete_parts_result =
520            self.uploader
521                .inner
522                .api_caller
523                .complete_parts(&CompletePartsRequest::new(
524                    &upload_token_provider,
525                    self.uploader.inner.bucket_name.as_ref(),
526                    self.object_name.as_deref(),
527                    init_parts_response.response_body().upload_id(),
528                    CompletePartsRequestBody::new(
529                        completed_parts,
530                        self.file_name,
531                        self.mime_type,
532                        self.metadata,
533                        self.custom_vars,
534                    ),
535                ))?;
536        Ok(UploadResult {
537            response_body: take(complete_parts_result.response_body_mut()),
538        })
539    }
540
541    #[inline]
542    fn make_upload_token_provider(&self) -> BucketOrObjectUploadTokenProvider {
543        BucketOrObjectUploadTokenProvider::new(
544            self.uploader.inner.bucket_name.to_owned(),
545            self.object_name.to_owned(),
546            Duration::from_secs(600),
547            self.uploader.inner.credential.to_owned(),
548        )
549    }
550}
551
552/// 上传结果
553#[derive(Debug, Clone)]
554pub struct UploadResult {
555    response_body: JSONValue,
556}
557
558impl UploadResult {
559    /// 获取上传结果响应
560    #[inline]
561    pub fn response_body(&self) -> &JSONValue {
562        &self.response_body
563    }
564}
565
566/// 上传进度信息
567#[derive(Debug, Clone)]
568pub struct UploadProgressInfo<'a> {
569    upload_id: &'a str,
570    uploaded: u64,
571    part_number: u32,
572}
573
574impl<'a> UploadProgressInfo<'a> {
575    /// 获取 Upload ID
576    #[inline]
577    pub fn upload_id(&self) -> &str {
578        self.upload_id
579    }
580
581    /// 获取已经上传的数据量
582    #[inline]
583    pub fn uploaded(&self) -> u64 {
584        self.uploaded
585    }
586
587    /// 获取上传成功的分片号码
588    #[inline]
589    pub fn part_number(&self) -> u32 {
590        self.part_number
591    }
592}
593
594#[inline]
595fn is_client_error_status(code: StatusCode) -> bool {
596    return code.is_client_error() && code != to_status_code(406)
597        || [
598            to_status_code(501),
599            to_status_code(573),
600            to_status_code(608),
601            to_status_code(612),
602            to_status_code(614),
603            to_status_code(616),
604            to_status_code(619),
605            to_status_code(630),
606            to_status_code(631),
607            to_status_code(640),
608            to_status_code(701),
609        ]
610        .contains(&code);
611
612    #[inline]
613    fn to_status_code(code: u16) -> StatusCode {
614        StatusCode::from_u16(code).expect("Invalid status code")
615    }
616}
617
618#[derive(Debug)]
619enum BucketOrObjectUploadTokenProvider {
620    Bucket(BucketUploadTokenProvider),
621    Object(ObjectUploadTokenProvider),
622}
623
624impl BucketOrObjectUploadTokenProvider {
625    #[inline]
626    fn new(
627        bucket: String,
628        object: Option<String>,
629        upload_token_lifetime: Duration,
630        credential: Arc<dyn CredentialProvider>,
631    ) -> Self {
632        if let Some(object) = object {
633            Self::Object(ObjectUploadTokenProvider::new(
634                bucket,
635                object,
636                upload_token_lifetime,
637                credential,
638            ))
639        } else {
640            Self::Bucket(BucketUploadTokenProvider::new(
641                bucket,
642                upload_token_lifetime,
643                credential,
644            ))
645        }
646    }
647}
648
649impl UploadTokenProvider for BucketOrObjectUploadTokenProvider {
650    #[inline]
651    fn access_key(&self) -> ParseResult<Cow<str>> {
652        match self {
653            Self::Bucket(bucket_upload_token_provider) => bucket_upload_token_provider.access_key(),
654            Self::Object(object_upload_token_provider) => object_upload_token_provider.access_key(),
655        }
656    }
657
658    #[inline]
659    fn policy(&self) -> ParseResult<Cow<UploadPolicy>> {
660        match self {
661            Self::Bucket(bucket_upload_token_provider) => bucket_upload_token_provider.policy(),
662            Self::Object(object_upload_token_provider) => object_upload_token_provider.policy(),
663        }
664    }
665
666    #[inline]
667    fn to_string(&self) -> IOResult<Cow<str>> {
668        match self {
669            Self::Bucket(bucket_upload_token_provider) => bucket_upload_token_provider.to_string(),
670            Self::Object(object_upload_token_provider) => object_upload_token_provider.to_string(),
671        }
672    }
673
674    #[inline]
675    fn as_upload_token_provider(&self) -> &dyn UploadTokenProvider {
676        self
677    }
678
679    #[inline]
680    fn as_any(&self) -> &dyn std::any::Any {
681        self
682    }
683}
684
685#[cfg(test)]
686mod tests {
687    use super::*;
688    use digest::{generic_array::GenericArray, Digest};
689    use md5::Md5;
690    use rand::{prelude::*, rngs::OsRng};
691    use reqwest::blocking::get;
692    use std::{
693        env,
694        io::{copy, Read, Seek, SeekFrom},
695        time::{SystemTime, UNIX_EPOCH},
696    };
697    use tempfile::tempfile;
698
699    #[test]
700    fn test_upload_files() -> anyhow::Result<()> {
701        env_logger::try_init().ok();
702
703        let access_key = env::var("QINIU_ACCESS_KEY")?;
704        let secret_key = env::var("QINIU_SECRET_KEY")?;
705        let bucket_name = env::var("QINIU_BUCKET_NAME")?;
706        let bucket_domain = env::var("QINIU_BUCKET_DOMAIN")?;
707        let uc_url = env::var("QINIU_UC_URL")?;
708
709        let uploader = UploaderBuilder::new(access_key, secret_key, bucket_name)
710            .uc_urls(vec![uc_url])
711            .build();
712
713        test_upload_file_of(&uploader, &bucket_domain, 1023)?;
714        test_upload_file_of(&uploader, &bucket_domain, 1025)?;
715        test_upload_file_of(&uploader, &bucket_domain, (1 << 20) * 3)?;
716        test_upload_file_of(&uploader, &bucket_domain, (1 << 20) * 4)?;
717        test_upload_file_of(&uploader, &bucket_domain, (1 << 20) * 4 + 2)?;
718        test_upload_file_of(&uploader, &bucket_domain, (1 << 20) * 9 - 2)?;
719        return Ok(());
720
721        fn test_upload_file_of(
722            uploader: &Uploader,
723            bucket_domain: &str,
724            size: u64,
725        ) -> anyhow::Result<()> {
726            let (file, md5) = generate_file_with_md5(size)?;
727            let result = uploader
728                .upload_file(file)
729                .object_name(format!(
730                    "upload-{}-{}",
731                    size,
732                    SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis()
733                ))
734                .start()?;
735            let key = result
736                .response_body()
737                .get("key")
738                .and_then(|v| v.as_str())
739                .unwrap();
740            let url = format!("http://{}/{}", bucket_domain, key);
741            let mut response = get(&url)?;
742            let returned_md5 = {
743                let mut hasher = Md5::new();
744                copy(&mut response, &mut hasher)?;
745                hasher.finalize()
746            };
747            assert_eq!(md5, returned_md5);
748            Ok(())
749        }
750    }
751
752    #[test]
753    fn test_upload_overwritten_files() -> anyhow::Result<()> {
754        env_logger::try_init().ok();
755
756        let access_key = env::var("QINIU_ACCESS_KEY")?;
757        let secret_key = env::var("QINIU_SECRET_KEY")?;
758        let bucket_name = env::var("QINIU_BUCKET_NAME")?;
759        let bucket_domain = env::var("QINIU_BUCKET_DOMAIN")?;
760        let uc_url = env::var("QINIU_UC_URL")?;
761
762        let uploader = UploaderBuilder::new(access_key, secret_key, bucket_name)
763            .uc_urls(vec![uc_url])
764            .build();
765        const SIZE: u64 = 1 << 10;
766        let key = format!(
767            "upload-{}-{}",
768            SIZE,
769            SystemTime::now().duration_since(UNIX_EPOCH)?.as_millis()
770        );
771
772        let (file, _) = generate_file_with_md5(SIZE)?;
773        let (file_2, md5_2) = generate_file_with_md5(SIZE)?;
774
775        let _ = uploader
776            .upload_file(file)
777            .object_name(key.to_owned())
778            .start()?;
779        let result = uploader.upload_file(file_2).object_name(key).start()?;
780        let key = result
781            .response_body()
782            .get("key")
783            .and_then(|v| v.as_str())
784            .unwrap();
785        let url = format!("http://{}/{}", bucket_domain, key);
786        let mut response = get(&url)?;
787        let returned_md5 = {
788            let mut hasher = Md5::new();
789            copy(&mut response, &mut hasher)?;
790            hasher.finalize()
791        };
792        assert_eq!(md5_2, returned_md5);
793        Ok(())
794    }
795
796    #[inline]
797    fn generate_file_with_md5(
798        size: u64,
799    ) -> anyhow::Result<(File, GenericArray<u8, <Md5 as Digest>::OutputSize>)> {
800        let mut file = tempfile()?;
801        let rng = Box::new(OsRng) as Box<dyn RngCore>;
802        copy(&mut rng.take(size), &mut file)?;
803        file.seek(SeekFrom::Start(0))?;
804
805        let mut hasher = Md5::new();
806        copy(&mut file, &mut hasher)?;
807        file.seek(SeekFrom::Start(0))?;
808
809        Ok((file, hasher.finalize()))
810    }
811}