qiniu_uploader/
lib.rs

1//! 七牛文件上传
2
3//! use qiniu_uploader::{QiniuRegionEnum, QiniuUploader};
4//! use tokio::fs;
5//!
6//! #[tokio::main]
7//! async fn main() -> Result<(), anyhow::Error> {
8//!     let qiniu = QiniuUploader::new(
9//!         "access_key",
10//!         "secret_key",
11//!         "bucket",
12//!         Some(QiniuRegionEnum::Z0),
13//!         false,
14//!     );
15//!     let file = fs::File::open("./Cargo.lock").await?;
16//!     let file_size = file.metadata().await?.len() as usize;
17//!     qiniu
18//!         .clone()
19//!         .part_upload_file(
20//!             "test/Cargo.lock",
21//!             file,
22//!             file_size,
23//!             Some(1024 * 1024 * 50), // 分片大小
24//!             Some(10),               // 上传线程数量
25//!             None,                   // 进度条样式
26//!         )
27//!         .await?;
28//!     Ok(())
29//! }
30
31#![allow(dead_code)]
32#![cfg_attr(feature = "docs", feature(doc_cfg))]
33
34pub use anyhow::anyhow;
35use serde::{Deserialize, Serialize};
36use std::{collections::HashMap, str::FromStr};
37
38use base64::prelude::*;
39
40#[cfg(feature = "progress-bar")]
41use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
42
43#[cfg(feature = "progress-bar")]
44use futures_util::TryStreamExt;
45
46use mime::{Mime, APPLICATION_JSON, APPLICATION_OCTET_STREAM};
47use reqwest::{
48    header::{HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE},
49    multipart::{self, Form},
50    Body,
51};
52use tokio::io::AsyncReadExt;
53
54#[cfg(feature = "progress-bar")]
55use tokio_util::io::ReaderStream;
56
57#[cfg(feature = "progress-bar")]
58pub use indicatif;
59
60#[cfg(feature = "progress-bar")]
61use std::io::Cursor;
62
63#[cfg(feature = "progress-bar")]
64use tokio::io::BufReader;
65
66pub use mime;
67
68/// Qiniu上传实例
69#[derive(Debug, Clone)]
70pub struct QiniuUploader {
71    access_key: String,
72    secret_key: String,
73    bucket: String,
74    region: QiniuRegionEnum,
75    debug: bool,
76}
77
78/// 七牛区域Enum,见 <https://developer.qiniu.com/kodo/1671/region-endpoint-fq>
79#[derive(Debug, Clone, Copy)]
80pub enum QiniuRegionEnum {
81    Z0,
82    CNEast2,
83    Z1,
84    Z2,
85    NA0,
86    AS0,
87    APSouthEast2,
88    APSouthEast3,
89}
90
91impl QiniuRegionEnum {
92    pub fn get_upload_host(&self) -> String {
93        match self {
94            Self::Z0 => String::from("https://up-z0.qiniup.com"),
95            Self::Z1 => String::from("https://up-z1.qiniup.com"),
96            Self::Z2 => String::from("https://up-z2.qiniup.com"),
97            Self::NA0 => String::from("https://up-na0.qiniup.com"),
98            Self::AS0 => String::from("https://up-as0.qiniup.com"),
99            Self::APSouthEast2 => String::from("https://up-ap-southeast-2.qiniup.com"),
100            Self::APSouthEast3 => String::from("https://up-ap-southeast-3.qiniup.com"),
101            Self::CNEast2 => String::from("https://up-cn-east-2.qiniup.com"),
102        }
103    }
104}
105
106impl FromStr for QiniuRegionEnum {
107    type Err = anyhow::Error;
108
109    fn from_str(region: &str) -> Result<Self, Self::Err> {
110        let region = match region {
111            "z0" => Self::Z0,
112            "cn-east-2" => Self::CNEast2,
113            "z1" => Self::Z1,
114            "z2" => Self::Z2,
115            "na0" => Self::NA0,
116            "as0" => Self::AS0,
117            "ap-southeast-2" => Self::APSouthEast2,
118            "ap-southeast-3" => Self::APSouthEast3,
119            _ => return Err(anyhow!("Unknow region: {}", region)),
120        };
121        Ok(region)
122    }
123}
124
125/// 初始化分片任务响应
126#[derive(Debug, Deserialize)]
127struct InitialPartUploadResponse {
128    #[serde(rename = "uploadId")]
129    pub upload_id: String,
130
131    #[serde(rename = "expireAt")]
132    pub expire_at: i64,
133}
134
135/// 分片上传响应
136#[derive(Debug, Deserialize, Default)]
137struct PartUploadResponse {
138    pub etag: String,
139    pub md5: String,
140}
141
142/// 完成分片上传参数
143#[derive(Debug, Serialize)]
144struct CompletePartUploadParam {
145    pub etag: String,
146
147    #[serde(rename = "partNumber")]
148    pub part_number: i64,
149}
150
151// 最小上传分片大小 1MB
152const PART_MIN_SIZE: usize = 1024 * 1024;
153
154// 最大上传分片大小1GB
155const PART_MAX_SIZE: usize = 1024 * 1024 * 1024;
156
157impl QiniuUploader {
158    /// # 生成上传实例
159    /// ## 参数
160    /// - access_key: 七牛access_key
161    /// - secret_key: 七牛secret_key
162    /// - bucket: 七牛bucket
163    /// - region: 七牛上传区域,默认z0
164    /// - debug: 是否开启debug
165    pub fn new(
166        access_key: impl Into<String>,
167        secret_key: impl Into<String>,
168        bucket: impl Into<String>,
169        region: Option<QiniuRegionEnum>,
170        debug: bool,
171    ) -> Self {
172        let region = region.unwrap_or(QiniuRegionEnum::Z0);
173        Self {
174            access_key: access_key.into(),
175            secret_key: secret_key.into(),
176            bucket: bucket.into(),
177            region,
178            debug,
179        }
180    }
181
182    fn get_upload_token(&self, key: &str) -> String {
183        let deadline = chrono::Local::now().timestamp() + 3600;
184        let put_policy = r#"{"scope": "{bucket}:{key}", "deadline": {deadline}, "fsizeLimit": 1073741824, "returnBody": "{\"hash\": $(etag), \"key\": $(key)}"}"#;
185        let put_policy = put_policy
186            .replace("{bucket}", &self.bucket)
187            .replace("{deadline}", &deadline.to_string())
188            .replace("{key}", key);
189        let mut buf = String::new();
190        BASE64_URL_SAFE.encode_string(put_policy, &mut buf);
191        let hmac_digest = hmac_sha1::hmac_sha1(self.secret_key.as_bytes(), buf.as_bytes());
192        let mut sign = String::new();
193        BASE64_URL_SAFE.encode_string(hmac_digest, &mut sign);
194        let token = format!("{}:{sign}:{buf}", self.access_key);
195        if self.debug {
196            println!("key: {}, token: {}", key, token);
197        }
198        token
199    }
200
201    /// 直传文件组装multi_part <https://developer.qiniu.com/kodo/1312/upload>
202    fn make_multi_part<T: Into<Body>>(&self, key: &str, body: T, mime: Mime) -> Form {
203        let token = self.get_upload_token(key);
204        let mut headers = HeaderMap::new();
205        headers.insert(
206            HeaderName::from_str("Content-Type").unwrap(),
207            HeaderValue::from_str(mime.essence_str()).unwrap(),
208        );
209        let file_name = key.split("/").last().unwrap().to_string();
210        multipart::Form::new()
211            .part(
212                "file",
213                multipart::Part::stream(body)
214                    .file_name(file_name.clone())
215                    .headers(headers)
216                    .mime_str(mime.essence_str())
217                    .unwrap(),
218            )
219            .text("key", key.to_string())
220            .text("token", token)
221            .text("filename", file_name)
222    }
223
224    /// # 直传文件,带进度条
225    /// <https://developer.qiniu.com/kodo/1312/upload>
226    /// ## 参数
227    /// - key: 上传文件的key,如test/Cargo.lock
228    /// - data: R: AsyncReadExt + Unpin + Send + Sync + 'static
229    /// - mime: 文件类型
230    /// - file_size: 文件大小,单位 Byte
231    /// - progress_style: 进度条样式
232    #[cfg_attr(feature = "docs", doc(cfg(feature = "progress-bar")))]
233    #[cfg(feature = "progress-bar")]
234    pub async fn upload_file<R: AsyncReadExt + Unpin + Send + Sync + 'static>(
235        &self,
236        key: &str,
237        data: R,
238        mime: Mime,
239        file_size: usize,
240        progress_style: Option<ProgressStyle>,
241    ) -> Result<(), anyhow::Error> {
242        let reader = ReaderStream::new(data);
243        let pb = ProgressBar::new(file_size as u64);
244        let sty = match progress_style {
245            Some(sty)=>sty,
246            None=> ProgressStyle::default_bar().template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})").unwrap().progress_chars("#>-")
247        };
248        pb.set_style(sty);
249        let pb1 = pb.clone();
250        let stream = reader.inspect_ok(move |chunk| {
251            pb1.inc(chunk.len() as u64);
252        });
253        let body = Body::wrap_stream(stream);
254        let resp = self.upload_file_no_progress_bar(key, body, mime).await;
255        pb.finish();
256        resp
257    }
258
259    /// # 直传文件,没有进度条
260    /// <https://developer.qiniu.com/kodo/1312/upload>
261    /// ## 参数
262    /// - key: 上传文件的key,如test/Cargo.lock
263    /// - data: T: Into\<Body\>
264    /// - mime: 文件类型
265    #[cfg_attr(feature = "docs", doc(cfg(feature = "progress-bar")))]
266    #[cfg(feature = "progress-bar")]
267    pub async fn upload_file_no_progress_bar<T: Into<Body>>(
268        &self,
269        key: &str,
270        data: T,
271        mime: Mime,
272    ) -> Result<(), anyhow::Error> {
273        let form = self.make_multi_part(key, data, mime);
274        let response = reqwest::Client::new()
275            .post(self.region.get_upload_host())
276            .multipart(form)
277            .send()
278            .await?;
279        if !response.status().is_success() {
280            return Err(anyhow!(
281                "Failed to upload file: {} {}",
282                response.status().as_u16(),
283                response
284                    .text()
285                    .await
286                    .unwrap_or_else(|_| "Unknown error".to_string())
287            ));
288        }
289        if self.debug {
290            println!("upload_file response: {:#?}", response);
291        }
292        Ok(())
293    }
294
295    /// # 直传文件,没有进度条
296    /// <https://developer.qiniu.com/kodo/1312/upload>
297    /// ## 参数
298    /// - key: 上传文件的key,如test/Cargo.lock
299    /// - data: T: `Into<Body>`
300    /// - mime: 文件类型
301    #[cfg_attr(feature = "docs", doc(cfg(not(feature = "progress-bar"))))]
302    #[cfg(not(feature = "progress-bar"))]
303    pub async fn upload_file<T: Into<Body>>(
304        &self,
305        key: &str,
306        data: T,
307        mime: Mime,
308    ) -> Result<(), anyhow::Error> {
309        let form = self.make_multi_part(key, data, mime);
310        let response = reqwest::Client::new()
311            .post(self.region.get_upload_host())
312            .multipart(form)
313            .send()
314            .await?;
315        if !response.status().is_success() {
316            return Err(anyhow!(
317                "Failed to upload file: {} {}",
318                response.status().as_u16(),
319                response
320                    .text()
321                    .await
322                    .unwrap_or_else(|_| "Unknown error".to_string())
323            ));
324        }
325        if self.debug {
326            println!("upload_file response: {:#?}", response);
327        }
328        Ok(())
329    }
330
331    fn get_part_upload_token(&self, key: &str) -> String {
332        format!("UpToken {}", self.get_upload_token(key))
333    }
334
335    fn get_base64encode_key(&self, key: &str) -> String {
336        let mut res = String::new();
337        BASE64_URL_SAFE.encode_string(key, &mut res);
338        res
339    }
340
341    fn get_part_headers(&self, key: &str) -> HeaderMap {
342        let mut headers = HeaderMap::new();
343        headers.insert(
344            AUTHORIZATION,
345            HeaderValue::from_str(&self.get_part_upload_token(key)).unwrap(),
346        );
347        headers
348    }
349
350    /// 初始化任务 <https://developer.qiniu.com/kodo/6365/initialize-multipartupload>
351    async fn initial_part_upload(
352        &self,
353        key: &str,
354    ) -> Result<InitialPartUploadResponse, anyhow::Error> {
355        let url = format!(
356            "{}/buckets/{}/objects/{}/uploads",
357            self.region.get_upload_host(),
358            self.bucket,
359            self.get_base64encode_key(key),
360        );
361        let headers = self.get_part_headers(key);
362        let response = reqwest::Client::new()
363            .post(url)
364            .headers(headers)
365            .send()
366            .await?
367            .json::<InitialPartUploadResponse>()
368            .await?;
369        if self.debug {
370            println!("initial_part_upload response: {:#?}", response);
371        }
372        Ok(response)
373    }
374    /// 分块上传数据 <https://developer.qiniu.com/kodo/6366/upload-part>
375    async fn part_upload_no_progress_bar<T: Into<Body>>(
376        &self,
377        key: &str,
378        upload_id: &str,
379        part_number: i32,
380        file_size: usize,
381        data: T,
382    ) -> Result<PartUploadResponse, anyhow::Error> {
383        let url = format!(
384            "{}/buckets/{}/objects/{}/uploads/{upload_id}/{part_number}",
385            self.region.get_upload_host(),
386            self.bucket,
387            self.get_base64encode_key(key),
388        );
389        let mut headers = self.get_part_headers(key);
390        headers.insert(
391            CONTENT_TYPE,
392            HeaderValue::from_str(APPLICATION_OCTET_STREAM.essence_str()).unwrap(),
393        );
394        headers.insert(
395            CONTENT_LENGTH,
396            HeaderValue::from_str(&file_size.to_string()).unwrap(),
397        );
398        let response = reqwest::Client::new()
399            .put(url)
400            .headers(headers)
401            .body(data)
402            .send()
403            .await?
404            .json::<PartUploadResponse>()
405            .await;
406        let response = match response {
407            Ok(response) => response,
408            Err(e) => {
409                return Err(anyhow!("上传任务发生异常,{}", e.to_string()));
410            }
411        };
412        if self.debug {
413            println!("part_upload response: {:#?}", response);
414        }
415        Ok(response)
416    }
417
418    /// 分块上传数据 <https://developer.qiniu.com/kodo/6366/upload-part>
419    #[cfg_attr(feature = "docs", doc(cfg(feature = "progress-bar")))]
420    #[cfg(feature = "progress-bar")]
421    async fn part_upload(
422        &self,
423        key: &str,
424        upload_id: &str,
425        part_number: i32,
426        data: Vec<u8>,
427        pb: ProgressBar,
428    ) -> Result<PartUploadResponse, anyhow::Error> {
429        let size = data.len();
430        let reader = ReaderStream::new(BufReader::new(Cursor::new(data)));
431        let pb1 = pb.clone();
432        let stream = reader.inspect_ok(move |chunk| {
433            pb1.inc(chunk.len() as u64);
434        });
435        let body = Body::wrap_stream(stream);
436        let resp = self
437            .part_upload_no_progress_bar(key, upload_id, part_number, size, body)
438            .await;
439        pb.finish();
440        resp
441    }
442
443    /// 完成文件上传 <https://developer.qiniu.com/kodo/6368/complete-multipart-upload>
444    async fn complete_part_upload(
445        &self,
446        key: &str,
447        upload_id: &str,
448        parts: Vec<CompletePartUploadParam>,
449    ) -> Result<(), anyhow::Error> {
450        let url = format!(
451            "{}/buckets/{}/objects/{}/uploads/{upload_id}",
452            self.region.get_upload_host(),
453            self.bucket,
454            self.get_base64encode_key(key)
455        );
456        let mut headers = self.get_part_headers(key);
457        headers.insert(
458            CONTENT_TYPE,
459            APPLICATION_JSON.essence_str().try_into().unwrap(),
460        );
461        let mut data = HashMap::new();
462        data.insert("parts", parts);
463        let response = reqwest::Client::new()
464            .post(url)
465            .json(&data)
466            .headers(headers)
467            .send()
468            .await?;
469        if self.debug {
470            println!("complete_part_upload response: {:#?}", response);
471        }
472        if !response.status().is_success() {
473            return Err(anyhow!(
474                "Failed to complete_part_upload: {} {}",
475                response.status().as_u16(),
476                response
477                    .text()
478                    .await
479                    .unwrap_or_else(|_| "Unknown error".to_string())
480            ));
481        }
482        if self.debug {
483            println!("complete_part_upload response: {:#?}", response);
484        }
485        Ok(())
486    }
487
488    /// # 分片上传 v2 版,显示进度条
489    /// <https://developer.qiniu.com/kodo/6364/multipartupload-interface>
490    /// ## 参数
491    /// - key: 上传的key,如`test/Cargo.lock`
492    /// - data: R: AsyncReadExt + Unpin + Send + Sync + 'static
493    /// - file_size: 文件大小,单位 Byte
494    /// - part_size: 分片上传的大小,单位Byte,1M-1GB之间,如果指定,优先级比`threads`参数高
495    /// - threads: 分片上传线程,在未指定`part_size`参数的情况下生效,默认5
496    /// - progress_style: 进度条样式
497    #[cfg_attr(feature = "docs", doc(cfg(feature = "progress-bar")))]
498    #[cfg(feature = "progress-bar")]
499    pub async fn part_upload_file<R: AsyncReadExt + Unpin + Send + Sync + 'static>(
500        self,
501        key: &str,
502        mut data: R,
503        file_size: usize,
504        part_size: Option<usize>,
505        threads: Option<u8>,
506        progress_style: Option<ProgressStyle>,
507    ) -> Result<(), anyhow::Error> {
508        let initiate = self.initial_part_upload(key).await?;
509        let upload_id = initiate.upload_id;
510        let mut part_number = 0;
511        let mut upload_bytes = 0;
512        let mut handles = Vec::new();
513        let multi = MultiProgress::new();
514        let sty = match progress_style {
515            Some(sty)=>sty,
516            None=> ProgressStyle::default_bar().template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})").unwrap().progress_chars("#>-")
517        };
518        // 单个 Part大小范围 1 MB - 1 GB,如果未指定part_size,默认5个线程
519        let mut part_size = match part_size {
520            Some(size) => size,
521            None => file_size / threads.unwrap_or(5) as usize,
522        };
523        part_size = part_size.clamp(PART_MIN_SIZE, PART_MAX_SIZE);
524        loop {
525            if upload_bytes >= file_size {
526                break;
527            }
528            let last_bytes = file_size - upload_bytes;
529            let mut part_size1 = part_size;
530            // 倒数第二次上传后剩余小于1M,附加到倒数第二次上传
531            if last_bytes < part_size + PART_MIN_SIZE && last_bytes < PART_MAX_SIZE {
532                part_size1 = last_bytes;
533            }
534            let mut buf = vec![0; part_size1];
535            data.read_exact(&mut buf).await?;
536            part_number += 1;
537            upload_bytes += part_size1;
538            let this = self.clone();
539            let key = key.to_string();
540            let upload_id = upload_id.clone();
541            let pb = multi.add(ProgressBar::new(buf.len() as u64));
542            pb.set_style(sty.clone());
543            let handle = tokio::spawn(async move {
544                // 尝试10次
545                let mut try_times = 10;
546                let mut resp = Err(anyhow!("发生异常"));
547                while try_times > 0 {
548                    resp = this
549                        .part_upload(&key, &upload_id, part_number, buf.clone(), pb.clone())
550                        .await;
551                    if resp.is_ok() {
552                        break;
553                    }
554                    try_times -= 1;
555                }
556                resp
557            });
558            handles.push(handle);
559        }
560        let mut parts = Vec::new();
561        for (i, handle) in handles.into_iter().enumerate() {
562            match handle.await? {
563                Ok(res) => {
564                    parts.push(CompletePartUploadParam {
565                        etag: res.etag.clone(),
566                        part_number: (i + 1) as i64,
567                    });
568                }
569                Err(e) => {
570                    self.part_abort(key, &upload_id).await?;
571                    return Err(e);
572                }
573            }
574        }
575        if self.debug {
576            println!("parts: {:#?}", parts);
577        }
578        // complete part upload
579        self.complete_part_upload(key, &upload_id, parts).await?;
580        Ok(())
581    }
582
583    /// # 分片上传 v2 版,不显示进度条
584    /// <https://developer.qiniu.com/kodo/6364/multipartupload-interface>
585    /// ## 参数
586    /// - key: 上传的key,如`test/Cargo.lock`
587    /// - data: R: AsyncReadExt + Unpin + Send + Sync + 'static
588    /// - file_size: 文件大小,单位 Byte
589    /// - part_size: 分片上传的大小,单位Byte,1M-1GB之间,如果指定,优先级比`threads`参数高
590    /// - threads: 分片上传线程,在未指定`part_size`参数的情况下生效,默认5
591    #[cfg_attr(feature = "docs", doc(cfg(feature = "progress-bar")))]
592    #[cfg(feature = "progress-bar")]
593    pub async fn part_upload_file_no_progress_bar<
594        R: AsyncReadExt + Unpin + Send + Sync + 'static,
595    >(
596        self,
597        key: &str,
598        mut data: R,
599        file_size: usize,
600        part_size: Option<usize>,
601        threads: Option<u8>,
602    ) -> Result<(), anyhow::Error> {
603        let initiate = self.initial_part_upload(key).await?;
604        let upload_id = initiate.upload_id;
605        let mut part_number = 0;
606        let mut upload_bytes = 0;
607        let mut handles = Vec::new();
608        // 单个 Part大小范围 1 MB - 1 GB,如果未指定part_size,默认5个线程
609        let mut part_size = match part_size {
610            Some(size) => size,
611            None => file_size / threads.unwrap_or(5) as usize,
612        };
613        part_size = part_size.clamp(PART_MIN_SIZE, PART_MAX_SIZE);
614        loop {
615            if upload_bytes >= file_size {
616                break;
617            }
618            let last_bytes = file_size - upload_bytes;
619            let mut part_size1 = part_size;
620            // 倒数第二次上传后剩余小于1M,附加到倒数第二次上传
621            if last_bytes < part_size + PART_MIN_SIZE && last_bytes < PART_MAX_SIZE {
622                part_size1 = last_bytes;
623            }
624            let mut buf = vec![0; part_size1];
625            data.read_exact(&mut buf).await?;
626            part_number += 1;
627            upload_bytes += part_size1;
628            let this = self.clone();
629            let key = key.to_string();
630            let upload_id = upload_id.clone();
631            let handle = tokio::spawn(async move {
632                // 尝试10次
633                let mut try_times = 10;
634                let mut resp = Err(anyhow!("发生异常"));
635                while try_times > 0 {
636                    resp = this
637                        .part_upload_no_progress_bar(
638                            &key,
639                            &upload_id,
640                            part_number,
641                            part_size1,
642                            buf.clone(),
643                        )
644                        .await;
645                    if resp.is_ok() {
646                        break;
647                    }
648                    try_times -= 1;
649                }
650                resp
651            });
652            handles.push(handle);
653        }
654        let mut parts = Vec::new();
655        for (i, handle) in handles.into_iter().enumerate() {
656            match handle.await? {
657                Ok(res) => {
658                    parts.push(CompletePartUploadParam {
659                        etag: res.etag.clone(),
660                        part_number: (i + 1) as i64,
661                    });
662                }
663                Err(e) => {
664                    self.part_abort(key, &upload_id).await?;
665                    return Err(e);
666                }
667            }
668        }
669        if self.debug {
670            println!("parts: {:#?}", parts);
671        }
672        // complete part upload
673        self.complete_part_upload(key, &upload_id, parts).await?;
674        Ok(())
675    }
676
677    /// # 分片上传 v2 版,不显示进度条
678    /// <https://developer.qiniu.com/kodo/6364/multipartupload-interface>
679    /// ## 参数
680    /// - key: 上传的key,如`test/Cargo.lock`
681    /// - data: R: AsyncReadExt + Unpin + Send + Sync + 'static
682    /// - file_size: 文件大小,单位 Byte
683    /// - part_size: 分片上传的大小,单位Byte,1M-1GB之间,如果指定,优先级比`threads`参数高
684    /// - threads: 分片上传线程,在未指定`part_size`参数的情况下生效,默认5
685    #[cfg_attr(feature = "docs", doc(cfg(not(feature = "progress-bar"))))]
686    #[cfg(not(feature = "progress-bar"))]
687    pub async fn part_upload_file<R: AsyncReadExt + Unpin + Send + Sync + 'static>(
688        self,
689        key: &str,
690        mut data: R,
691        file_size: usize,
692        part_size: Option<usize>,
693        threads: Option<u8>,
694    ) -> Result<(), anyhow::Error> {
695        let initiate = self.initial_part_upload(key).await?;
696        let upload_id = initiate.upload_id;
697        let mut part_number = 0;
698        let mut upload_bytes = 0;
699        let mut handles = Vec::new();
700        // 单个 Part大小范围 1 MB - 1 GB,如果未指定part_size,默认5个线程
701        let mut part_size = match part_size {
702            Some(size) => size,
703            None => file_size / threads.unwrap_or(5) as usize,
704        };
705        part_size = part_size.clamp(PART_MIN_SIZE, PART_MAX_SIZE);
706        loop {
707            if upload_bytes >= file_size {
708                break;
709            }
710            let last_bytes = file_size - upload_bytes;
711            let mut part_size1 = part_size;
712            // 倒数第二次上传后剩余小于1M,附加到倒数第二次上传
713            if last_bytes < part_size + PART_MIN_SIZE && last_bytes < PART_MAX_SIZE {
714                part_size1 = last_bytes;
715            }
716            let mut buf = vec![0; part_size1];
717            data.read_exact(&mut buf).await?;
718            part_number += 1;
719            upload_bytes += part_size1;
720            let this = self.clone();
721            let key = key.to_string();
722            let upload_id = upload_id.clone();
723            let handle = tokio::spawn(async move {
724                // 尝试10次
725                let mut try_times = 10;
726                let mut resp = Err(anyhow!("发生异常"));
727                while try_times > 0 {
728                    resp = this
729                        .part_upload_no_progress_bar(
730                            &key,
731                            &upload_id,
732                            part_number,
733                            part_size1,
734                            buf.clone(),
735                        )
736                        .await;
737                    if resp.is_ok() {
738                        break;
739                    }
740                    try_times -= 1;
741                }
742                resp
743            });
744            handles.push(handle);
745        }
746        let mut parts = Vec::new();
747        for (i, handle) in handles.into_iter().enumerate() {
748            match handle.await? {
749                Ok(res) => {
750                    parts.push(CompletePartUploadParam {
751                        etag: res.etag.clone(),
752                        part_number: (i + 1) as i64,
753                    });
754                }
755                Err(e) => {
756                    self.part_abort(&key, &upload_id).await?;
757                    return Err(e);
758                }
759            }
760        }
761        if self.debug {
762            println!("parts: {:#?}", parts);
763        }
764        // complete part upload
765        self.complete_part_upload(key, &upload_id, parts).await?;
766        Ok(())
767    }
768
769    /// # 终止上传(分片)
770    /// <https://developer.qiniu.com/kodo/6367/abort-multipart-upload>
771    /// ## 参数
772    /// - key: 上传的key,如`test/Cargo.lock`
773    /// - upload_id: upload 任务 id
774    pub async fn part_abort(&self, key: &str, upload_id: &str) -> Result<(), anyhow::Error> {
775        let url = format!(
776            "{}/buckets/{}/objects/{}/uploads/{upload_id}",
777            self.region.get_upload_host(),
778            self.bucket,
779            self.get_base64encode_key(key)
780        );
781        let headers = self.get_part_headers(key);
782        let response = reqwest::Client::new()
783            .delete(url)
784            .headers(headers)
785            .send()
786            .await?;
787        if self.debug {
788            println!("part abort {} {}, {:#?}", key, upload_id, response);
789        }
790        Ok(())
791    }
792}