1#![allow(dead_code)]
32#![cfg_attr(feature = "docs", feature(doc_cfg))]
33
34pub use anyhow::anyhow;
35use serde::{Deserialize, Serialize};
36use std::{collections::HashMap, str::FromStr};
37
38use base64::prelude::*;
39
40#[cfg(feature = "progress-bar")]
41use indicatif::{MultiProgress, ProgressBar, ProgressStyle};
42
43#[cfg(feature = "progress-bar")]
44use futures_util::TryStreamExt;
45
46use mime::{Mime, APPLICATION_JSON, APPLICATION_OCTET_STREAM};
47use reqwest::{
48 header::{HeaderMap, HeaderName, HeaderValue, AUTHORIZATION, CONTENT_LENGTH, CONTENT_TYPE},
49 multipart::{self, Form},
50 Body,
51};
52use tokio::io::AsyncReadExt;
53
54#[cfg(feature = "progress-bar")]
55use tokio_util::io::ReaderStream;
56
57#[cfg(feature = "progress-bar")]
58pub use indicatif;
59
60#[cfg(feature = "progress-bar")]
61use std::io::Cursor;
62
63#[cfg(feature = "progress-bar")]
64use tokio::io::BufReader;
65
66pub use mime;
67
68#[derive(Debug, Clone)]
70pub struct QiniuUploader {
71 access_key: String,
72 secret_key: String,
73 bucket: String,
74 region: QiniuRegionEnum,
75 debug: bool,
76}
77
78#[derive(Debug, Clone, Copy)]
80pub enum QiniuRegionEnum {
81 Z0,
82 CNEast2,
83 Z1,
84 Z2,
85 NA0,
86 AS0,
87 APSouthEast2,
88 APSouthEast3,
89}
90
91impl QiniuRegionEnum {
92 pub fn get_upload_host(&self) -> String {
93 match self {
94 Self::Z0 => String::from("https://up-z0.qiniup.com"),
95 Self::Z1 => String::from("https://up-z1.qiniup.com"),
96 Self::Z2 => String::from("https://up-z2.qiniup.com"),
97 Self::NA0 => String::from("https://up-na0.qiniup.com"),
98 Self::AS0 => String::from("https://up-as0.qiniup.com"),
99 Self::APSouthEast2 => String::from("https://up-ap-southeast-2.qiniup.com"),
100 Self::APSouthEast3 => String::from("https://up-ap-southeast-3.qiniup.com"),
101 Self::CNEast2 => String::from("https://up-cn-east-2.qiniup.com"),
102 }
103 }
104}
105
106impl FromStr for QiniuRegionEnum {
107 type Err = anyhow::Error;
108
109 fn from_str(region: &str) -> Result<Self, Self::Err> {
110 let region = match region {
111 "z0" => Self::Z0,
112 "cn-east-2" => Self::CNEast2,
113 "z1" => Self::Z1,
114 "z2" => Self::Z2,
115 "na0" => Self::NA0,
116 "as0" => Self::AS0,
117 "ap-southeast-2" => Self::APSouthEast2,
118 "ap-southeast-3" => Self::APSouthEast3,
119 _ => return Err(anyhow!("Unknow region: {}", region)),
120 };
121 Ok(region)
122 }
123}
124
125#[derive(Debug, Deserialize)]
127struct InitialPartUploadResponse {
128 #[serde(rename = "uploadId")]
129 pub upload_id: String,
130
131 #[serde(rename = "expireAt")]
132 pub expire_at: i64,
133}
134
135#[derive(Debug, Deserialize, Default)]
137struct PartUploadResponse {
138 pub etag: String,
139 pub md5: String,
140}
141
142#[derive(Debug, Serialize)]
144struct CompletePartUploadParam {
145 pub etag: String,
146
147 #[serde(rename = "partNumber")]
148 pub part_number: i64,
149}
150
151const PART_MIN_SIZE: usize = 1024 * 1024;
153
154const PART_MAX_SIZE: usize = 1024 * 1024 * 1024;
156
157impl QiniuUploader {
158 pub fn new(
166 access_key: impl Into<String>,
167 secret_key: impl Into<String>,
168 bucket: impl Into<String>,
169 region: Option<QiniuRegionEnum>,
170 debug: bool,
171 ) -> Self {
172 let region = region.unwrap_or(QiniuRegionEnum::Z0);
173 Self {
174 access_key: access_key.into(),
175 secret_key: secret_key.into(),
176 bucket: bucket.into(),
177 region,
178 debug,
179 }
180 }
181
182 fn get_upload_token(&self, key: &str) -> String {
183 let deadline = chrono::Local::now().timestamp() + 3600;
184 let put_policy = r#"{"scope": "{bucket}:{key}", "deadline": {deadline}, "fsizeLimit": 1073741824, "returnBody": "{\"hash\": $(etag), \"key\": $(key)}"}"#;
185 let put_policy = put_policy
186 .replace("{bucket}", &self.bucket)
187 .replace("{deadline}", &deadline.to_string())
188 .replace("{key}", key);
189 let mut buf = String::new();
190 BASE64_URL_SAFE.encode_string(put_policy, &mut buf);
191 let hmac_digest = hmac_sha1::hmac_sha1(self.secret_key.as_bytes(), buf.as_bytes());
192 let mut sign = String::new();
193 BASE64_URL_SAFE.encode_string(hmac_digest, &mut sign);
194 let token = format!("{}:{sign}:{buf}", self.access_key);
195 if self.debug {
196 println!("key: {}, token: {}", key, token);
197 }
198 token
199 }
200
201 fn make_multi_part<T: Into<Body>>(&self, key: &str, body: T, mime: Mime) -> Form {
203 let token = self.get_upload_token(key);
204 let mut headers = HeaderMap::new();
205 headers.insert(
206 HeaderName::from_str("Content-Type").unwrap(),
207 HeaderValue::from_str(mime.essence_str()).unwrap(),
208 );
209 let file_name = key.split("/").last().unwrap().to_string();
210 multipart::Form::new()
211 .part(
212 "file",
213 multipart::Part::stream(body)
214 .file_name(file_name.clone())
215 .headers(headers)
216 .mime_str(mime.essence_str())
217 .unwrap(),
218 )
219 .text("key", key.to_string())
220 .text("token", token)
221 .text("filename", file_name)
222 }
223
224 #[cfg_attr(feature = "docs", doc(cfg(feature = "progress-bar")))]
233 #[cfg(feature = "progress-bar")]
234 pub async fn upload_file<R: AsyncReadExt + Unpin + Send + Sync + 'static>(
235 &self,
236 key: &str,
237 data: R,
238 mime: Mime,
239 file_size: usize,
240 progress_style: Option<ProgressStyle>,
241 ) -> Result<(), anyhow::Error> {
242 let reader = ReaderStream::new(data);
243 let pb = ProgressBar::new(file_size as u64);
244 let sty = match progress_style {
245 Some(sty)=>sty,
246 None=> ProgressStyle::default_bar().template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})").unwrap().progress_chars("#>-")
247 };
248 pb.set_style(sty);
249 let pb1 = pb.clone();
250 let stream = reader.inspect_ok(move |chunk| {
251 pb1.inc(chunk.len() as u64);
252 });
253 let body = Body::wrap_stream(stream);
254 let resp = self.upload_file_no_progress_bar(key, body, mime).await;
255 pb.finish();
256 resp
257 }
258
259 #[cfg_attr(feature = "docs", doc(cfg(feature = "progress-bar")))]
266 #[cfg(feature = "progress-bar")]
267 pub async fn upload_file_no_progress_bar<T: Into<Body>>(
268 &self,
269 key: &str,
270 data: T,
271 mime: Mime,
272 ) -> Result<(), anyhow::Error> {
273 let form = self.make_multi_part(key, data, mime);
274 let response = reqwest::Client::new()
275 .post(self.region.get_upload_host())
276 .multipart(form)
277 .send()
278 .await?;
279 if !response.status().is_success() {
280 return Err(anyhow!(
281 "Failed to upload file: {} {}",
282 response.status().as_u16(),
283 response
284 .text()
285 .await
286 .unwrap_or_else(|_| "Unknown error".to_string())
287 ));
288 }
289 if self.debug {
290 println!("upload_file response: {:#?}", response);
291 }
292 Ok(())
293 }
294
295 #[cfg_attr(feature = "docs", doc(cfg(not(feature = "progress-bar"))))]
302 #[cfg(not(feature = "progress-bar"))]
303 pub async fn upload_file<T: Into<Body>>(
304 &self,
305 key: &str,
306 data: T,
307 mime: Mime,
308 ) -> Result<(), anyhow::Error> {
309 let form = self.make_multi_part(key, data, mime);
310 let response = reqwest::Client::new()
311 .post(self.region.get_upload_host())
312 .multipart(form)
313 .send()
314 .await?;
315 if !response.status().is_success() {
316 return Err(anyhow!(
317 "Failed to upload file: {} {}",
318 response.status().as_u16(),
319 response
320 .text()
321 .await
322 .unwrap_or_else(|_| "Unknown error".to_string())
323 ));
324 }
325 if self.debug {
326 println!("upload_file response: {:#?}", response);
327 }
328 Ok(())
329 }
330
331 fn get_part_upload_token(&self, key: &str) -> String {
332 format!("UpToken {}", self.get_upload_token(key))
333 }
334
335 fn get_base64encode_key(&self, key: &str) -> String {
336 let mut res = String::new();
337 BASE64_URL_SAFE.encode_string(key, &mut res);
338 res
339 }
340
341 fn get_part_headers(&self, key: &str) -> HeaderMap {
342 let mut headers = HeaderMap::new();
343 headers.insert(
344 AUTHORIZATION,
345 HeaderValue::from_str(&self.get_part_upload_token(key)).unwrap(),
346 );
347 headers
348 }
349
350 async fn initial_part_upload(
352 &self,
353 key: &str,
354 ) -> Result<InitialPartUploadResponse, anyhow::Error> {
355 let url = format!(
356 "{}/buckets/{}/objects/{}/uploads",
357 self.region.get_upload_host(),
358 self.bucket,
359 self.get_base64encode_key(key),
360 );
361 let headers = self.get_part_headers(key);
362 let response = reqwest::Client::new()
363 .post(url)
364 .headers(headers)
365 .send()
366 .await?
367 .json::<InitialPartUploadResponse>()
368 .await?;
369 if self.debug {
370 println!("initial_part_upload response: {:#?}", response);
371 }
372 Ok(response)
373 }
374 async fn part_upload_no_progress_bar<T: Into<Body>>(
376 &self,
377 key: &str,
378 upload_id: &str,
379 part_number: i32,
380 file_size: usize,
381 data: T,
382 ) -> Result<PartUploadResponse, anyhow::Error> {
383 let url = format!(
384 "{}/buckets/{}/objects/{}/uploads/{upload_id}/{part_number}",
385 self.region.get_upload_host(),
386 self.bucket,
387 self.get_base64encode_key(key),
388 );
389 let mut headers = self.get_part_headers(key);
390 headers.insert(
391 CONTENT_TYPE,
392 HeaderValue::from_str(APPLICATION_OCTET_STREAM.essence_str()).unwrap(),
393 );
394 headers.insert(
395 CONTENT_LENGTH,
396 HeaderValue::from_str(&file_size.to_string()).unwrap(),
397 );
398 let response = reqwest::Client::new()
399 .put(url)
400 .headers(headers)
401 .body(data)
402 .send()
403 .await?
404 .json::<PartUploadResponse>()
405 .await;
406 let response = match response {
407 Ok(response) => response,
408 Err(e) => {
409 return Err(anyhow!("上传任务发生异常,{}", e.to_string()));
410 }
411 };
412 if self.debug {
413 println!("part_upload response: {:#?}", response);
414 }
415 Ok(response)
416 }
417
418 #[cfg_attr(feature = "docs", doc(cfg(feature = "progress-bar")))]
420 #[cfg(feature = "progress-bar")]
421 async fn part_upload(
422 &self,
423 key: &str,
424 upload_id: &str,
425 part_number: i32,
426 data: Vec<u8>,
427 pb: ProgressBar,
428 ) -> Result<PartUploadResponse, anyhow::Error> {
429 let size = data.len();
430 let reader = ReaderStream::new(BufReader::new(Cursor::new(data)));
431 let pb1 = pb.clone();
432 let stream = reader.inspect_ok(move |chunk| {
433 pb1.inc(chunk.len() as u64);
434 });
435 let body = Body::wrap_stream(stream);
436 let resp = self
437 .part_upload_no_progress_bar(key, upload_id, part_number, size, body)
438 .await;
439 pb.finish();
440 resp
441 }
442
443 async fn complete_part_upload(
445 &self,
446 key: &str,
447 upload_id: &str,
448 parts: Vec<CompletePartUploadParam>,
449 ) -> Result<(), anyhow::Error> {
450 let url = format!(
451 "{}/buckets/{}/objects/{}/uploads/{upload_id}",
452 self.region.get_upload_host(),
453 self.bucket,
454 self.get_base64encode_key(key)
455 );
456 let mut headers = self.get_part_headers(key);
457 headers.insert(
458 CONTENT_TYPE,
459 APPLICATION_JSON.essence_str().try_into().unwrap(),
460 );
461 let mut data = HashMap::new();
462 data.insert("parts", parts);
463 let response = reqwest::Client::new()
464 .post(url)
465 .json(&data)
466 .headers(headers)
467 .send()
468 .await?;
469 if self.debug {
470 println!("complete_part_upload response: {:#?}", response);
471 }
472 if !response.status().is_success() {
473 return Err(anyhow!(
474 "Failed to complete_part_upload: {} {}",
475 response.status().as_u16(),
476 response
477 .text()
478 .await
479 .unwrap_or_else(|_| "Unknown error".to_string())
480 ));
481 }
482 if self.debug {
483 println!("complete_part_upload response: {:#?}", response);
484 }
485 Ok(())
486 }
487
488 #[cfg_attr(feature = "docs", doc(cfg(feature = "progress-bar")))]
498 #[cfg(feature = "progress-bar")]
499 pub async fn part_upload_file<R: AsyncReadExt + Unpin + Send + Sync + 'static>(
500 self,
501 key: &str,
502 mut data: R,
503 file_size: usize,
504 part_size: Option<usize>,
505 threads: Option<u8>,
506 progress_style: Option<ProgressStyle>,
507 ) -> Result<(), anyhow::Error> {
508 let initiate = self.initial_part_upload(key).await?;
509 let upload_id = initiate.upload_id;
510 let mut part_number = 0;
511 let mut upload_bytes = 0;
512 let mut handles = Vec::new();
513 let multi = MultiProgress::new();
514 let sty = match progress_style {
515 Some(sty)=>sty,
516 None=> ProgressStyle::default_bar().template("{spinner:.green} [{elapsed_precise}] [{wide_bar:.cyan/blue}] {bytes}/{total_bytes} ({bytes_per_sec}, {eta})").unwrap().progress_chars("#>-")
517 };
518 let mut part_size = match part_size {
520 Some(size) => size,
521 None => file_size / threads.unwrap_or(5) as usize,
522 };
523 part_size = part_size.clamp(PART_MIN_SIZE, PART_MAX_SIZE);
524 loop {
525 if upload_bytes >= file_size {
526 break;
527 }
528 let last_bytes = file_size - upload_bytes;
529 let mut part_size1 = part_size;
530 if last_bytes < part_size + PART_MIN_SIZE && last_bytes < PART_MAX_SIZE {
532 part_size1 = last_bytes;
533 }
534 let mut buf = vec![0; part_size1];
535 data.read_exact(&mut buf).await?;
536 part_number += 1;
537 upload_bytes += part_size1;
538 let this = self.clone();
539 let key = key.to_string();
540 let upload_id = upload_id.clone();
541 let pb = multi.add(ProgressBar::new(buf.len() as u64));
542 pb.set_style(sty.clone());
543 let handle = tokio::spawn(async move {
544 let mut try_times = 10;
546 let mut resp = Err(anyhow!("发生异常"));
547 while try_times > 0 {
548 resp = this
549 .part_upload(&key, &upload_id, part_number, buf.clone(), pb.clone())
550 .await;
551 if resp.is_ok() {
552 break;
553 }
554 try_times -= 1;
555 }
556 resp
557 });
558 handles.push(handle);
559 }
560 let mut parts = Vec::new();
561 for (i, handle) in handles.into_iter().enumerate() {
562 match handle.await? {
563 Ok(res) => {
564 parts.push(CompletePartUploadParam {
565 etag: res.etag.clone(),
566 part_number: (i + 1) as i64,
567 });
568 }
569 Err(e) => {
570 self.part_abort(key, &upload_id).await?;
571 return Err(e);
572 }
573 }
574 }
575 if self.debug {
576 println!("parts: {:#?}", parts);
577 }
578 self.complete_part_upload(key, &upload_id, parts).await?;
580 Ok(())
581 }
582
583 #[cfg_attr(feature = "docs", doc(cfg(feature = "progress-bar")))]
592 #[cfg(feature = "progress-bar")]
593 pub async fn part_upload_file_no_progress_bar<
594 R: AsyncReadExt + Unpin + Send + Sync + 'static,
595 >(
596 self,
597 key: &str,
598 mut data: R,
599 file_size: usize,
600 part_size: Option<usize>,
601 threads: Option<u8>,
602 ) -> Result<(), anyhow::Error> {
603 let initiate = self.initial_part_upload(key).await?;
604 let upload_id = initiate.upload_id;
605 let mut part_number = 0;
606 let mut upload_bytes = 0;
607 let mut handles = Vec::new();
608 let mut part_size = match part_size {
610 Some(size) => size,
611 None => file_size / threads.unwrap_or(5) as usize,
612 };
613 part_size = part_size.clamp(PART_MIN_SIZE, PART_MAX_SIZE);
614 loop {
615 if upload_bytes >= file_size {
616 break;
617 }
618 let last_bytes = file_size - upload_bytes;
619 let mut part_size1 = part_size;
620 if last_bytes < part_size + PART_MIN_SIZE && last_bytes < PART_MAX_SIZE {
622 part_size1 = last_bytes;
623 }
624 let mut buf = vec![0; part_size1];
625 data.read_exact(&mut buf).await?;
626 part_number += 1;
627 upload_bytes += part_size1;
628 let this = self.clone();
629 let key = key.to_string();
630 let upload_id = upload_id.clone();
631 let handle = tokio::spawn(async move {
632 let mut try_times = 10;
634 let mut resp = Err(anyhow!("发生异常"));
635 while try_times > 0 {
636 resp = this
637 .part_upload_no_progress_bar(
638 &key,
639 &upload_id,
640 part_number,
641 part_size1,
642 buf.clone(),
643 )
644 .await;
645 if resp.is_ok() {
646 break;
647 }
648 try_times -= 1;
649 }
650 resp
651 });
652 handles.push(handle);
653 }
654 let mut parts = Vec::new();
655 for (i, handle) in handles.into_iter().enumerate() {
656 match handle.await? {
657 Ok(res) => {
658 parts.push(CompletePartUploadParam {
659 etag: res.etag.clone(),
660 part_number: (i + 1) as i64,
661 });
662 }
663 Err(e) => {
664 self.part_abort(key, &upload_id).await?;
665 return Err(e);
666 }
667 }
668 }
669 if self.debug {
670 println!("parts: {:#?}", parts);
671 }
672 self.complete_part_upload(key, &upload_id, parts).await?;
674 Ok(())
675 }
676
677 #[cfg_attr(feature = "docs", doc(cfg(not(feature = "progress-bar"))))]
686 #[cfg(not(feature = "progress-bar"))]
687 pub async fn part_upload_file<R: AsyncReadExt + Unpin + Send + Sync + 'static>(
688 self,
689 key: &str,
690 mut data: R,
691 file_size: usize,
692 part_size: Option<usize>,
693 threads: Option<u8>,
694 ) -> Result<(), anyhow::Error> {
695 let initiate = self.initial_part_upload(key).await?;
696 let upload_id = initiate.upload_id;
697 let mut part_number = 0;
698 let mut upload_bytes = 0;
699 let mut handles = Vec::new();
700 let mut part_size = match part_size {
702 Some(size) => size,
703 None => file_size / threads.unwrap_or(5) as usize,
704 };
705 part_size = part_size.clamp(PART_MIN_SIZE, PART_MAX_SIZE);
706 loop {
707 if upload_bytes >= file_size {
708 break;
709 }
710 let last_bytes = file_size - upload_bytes;
711 let mut part_size1 = part_size;
712 if last_bytes < part_size + PART_MIN_SIZE && last_bytes < PART_MAX_SIZE {
714 part_size1 = last_bytes;
715 }
716 let mut buf = vec![0; part_size1];
717 data.read_exact(&mut buf).await?;
718 part_number += 1;
719 upload_bytes += part_size1;
720 let this = self.clone();
721 let key = key.to_string();
722 let upload_id = upload_id.clone();
723 let handle = tokio::spawn(async move {
724 let mut try_times = 10;
726 let mut resp = Err(anyhow!("发生异常"));
727 while try_times > 0 {
728 resp = this
729 .part_upload_no_progress_bar(
730 &key,
731 &upload_id,
732 part_number,
733 part_size1,
734 buf.clone(),
735 )
736 .await;
737 if resp.is_ok() {
738 break;
739 }
740 try_times -= 1;
741 }
742 resp
743 });
744 handles.push(handle);
745 }
746 let mut parts = Vec::new();
747 for (i, handle) in handles.into_iter().enumerate() {
748 match handle.await? {
749 Ok(res) => {
750 parts.push(CompletePartUploadParam {
751 etag: res.etag.clone(),
752 part_number: (i + 1) as i64,
753 });
754 }
755 Err(e) => {
756 self.part_abort(&key, &upload_id).await?;
757 return Err(e);
758 }
759 }
760 }
761 if self.debug {
762 println!("parts: {:#?}", parts);
763 }
764 self.complete_part_upload(key, &upload_id, parts).await?;
766 Ok(())
767 }
768
769 pub async fn part_abort(&self, key: &str, upload_id: &str) -> Result<(), anyhow::Error> {
775 let url = format!(
776 "{}/buckets/{}/objects/{}/uploads/{upload_id}",
777 self.region.get_upload_host(),
778 self.bucket,
779 self.get_base64encode_key(key)
780 );
781 let headers = self.get_part_headers(key);
782 let response = reqwest::Client::new()
783 .delete(url)
784 .headers(headers)
785 .send()
786 .await?;
787 if self.debug {
788 println!("part abort {} {}, {:#?}", key, upload_id, response);
789 }
790 Ok(())
791 }
792}