1use super::errors::Error;
2use bytes::Bytes;
3use chrono::prelude::*;
4use quick_xml::{events::Event, Reader};
5use reqwest::header::{HeaderMap, CONTENT_LENGTH, DATE, ETAG};
6use reqwest::Client;
7use serde_derive::{Deserialize, Serialize};
8use serde_xml_rs::{from_str, to_string};
9use std::collections::HashMap;
10use std::str;
11
12use crate::bucket::{Bucket, ListBuckets};
13use crate::errors::ObjectError;
14
15use super::auth::*;
16use super::utils::*;
17
18#[derive(Clone, Debug)]
19pub struct OSS {
20 key_id: String,
21 key_secret: String,
22 endpoint: String,
23 bucket: String,
24 pub client: Client,
25}
26
27const RESOURCES: [&str; 50] = [
28 "acl",
29 "uploads",
30 "location",
31 "cors",
32 "logging",
33 "website",
34 "referer",
35 "lifecycle",
36 "delete",
37 "append",
38 "tagging",
39 "objectMeta",
40 "uploadId",
41 "partNumber",
42 "security-token",
43 "position",
44 "img",
45 "style",
46 "styleName",
47 "replication",
48 "replicationProgress",
49 "replicationLocation",
50 "cname",
51 "bucketInfo",
52 "comp",
53 "qos",
54 "live",
55 "status",
56 "vod",
57 "startTime",
58 "endTime",
59 "symlink",
60 "x-oss-process",
61 "response-content-type",
62 "response-content-language",
63 "response-expires",
64 "response-cache-control",
65 "response-content-disposition",
66 "response-content-encoding",
67 "udf",
68 "udfName",
69 "udfImage",
70 "udfId",
71 "udfImageDesc",
72 "udfApplication",
73 "comp",
74 "udfApplicationLog",
75 "restore",
76 "callback",
77 "callback-var",
78];
79
80impl OSS {
81 pub fn new(key_id: String, key_secret: String, endpoint: String, bucket: String) -> Self {
82 OSS {
83 key_id: key_id,
84 key_secret: key_secret,
85 endpoint: endpoint,
86 bucket: bucket,
87 client: reqwest::Client::new(),
88 }
89 }
90
91 pub fn bucket(&self) -> &str {
92 &self.bucket
93 }
94
95 pub fn endpoint(&self) -> &str {
96 &self.endpoint
97 }
98
99 pub fn key_id(&self) -> &str {
100 &self.key_id
101 }
102
103 pub fn key_secret(&self) -> &str {
104 &self.key_secret
105 }
106
107 pub fn set_bucket(&mut self, bucket: &str) {
108 self.bucket = bucket.to_string()
109 }
110
111 pub fn host(&self, bucket: &str, object: &str, resources_str: &str) -> String {
112 if self.endpoint.starts_with("https") {
113 format!(
114 "https://{}.{}/{}?{}",
115 bucket,
116 self.endpoint.replacen("https://", "", 1),
117 object,
118 resources_str
119 )
120 } else {
121 format!(
122 "http://{}.{}/{}?{}",
123 bucket,
124 self.endpoint.replacen("http://", "", 1),
125 object,
126 resources_str
127 )
128 }
129 }
130
131 pub fn date(&self) -> String {
132 let now: DateTime<Utc> = Utc::now();
133 now.format("%a, %d %b %Y %T GMT").to_string()
134 }
135
136 pub fn get_resources_str<S>(&self, params: HashMap<S, Option<S>>) -> String
137 where
138 S: AsRef<str>,
139 {
140 let mut resources: Vec<(&S, &Option<S>)> = params
141 .iter()
142 .filter(|(k, _)| RESOURCES.contains(&k.as_ref()))
143 .collect();
144 resources.sort_by(|a, b| a.0.as_ref().to_string().cmp(&b.0.as_ref().to_string()));
145 let mut result = String::new();
146 for (k, v) in resources {
147 if !result.is_empty() {
148 result += "&";
149 }
150 if let Some(vv) = v {
151 result += &format!("{}={}", k.as_ref().to_owned(), vv.as_ref());
152 } else {
153 result += k.as_ref();
154 }
155 }
156 result
157 }
158
159 pub async fn list_bucket<S, R>(&self, resources: R) -> Result<ListBuckets, Error>
160 where
161 S: AsRef<str>,
162 R: Into<Option<HashMap<S, Option<S>>>>,
163 {
164 let resources_str = if let Some(r) = resources.into() {
165 self.get_resources_str(r)
166 } else {
167 String::new()
168 };
169 let host = self.endpoint();
170 let date = self.date();
171
172 let mut headers = HeaderMap::new();
173 headers.insert(DATE, date.parse()?);
174 let authorization = self.oss_sign(
175 "GET",
176 self.key_id(),
177 self.key_secret(),
178 "",
179 "",
180 &resources_str,
181 &headers,
182 );
183 headers.insert("Authorization", authorization.parse()?);
184
185 let resp = self.client.get(host).headers(headers).send().await?;
186
187 let xml_str = resp.text().await?;
188 let mut result = Vec::new();
189 let mut reader = Reader::from_str(xml_str.as_str());
190 reader.trim_text(true);
191 let mut buf = Vec::new();
192
193 let mut prefix = String::new();
194 let mut marker = String::new();
195 let mut max_keys = String::new();
196 let mut is_truncated = false;
197 let mut next_marker = String::new();
198 let mut id = String::new();
199 let mut display_name = String::new();
200
201 let mut name = String::new();
202 let mut location = String::new();
203 let mut create_date = String::new();
204 let mut extranet_endpoint = String::new();
205 let mut intranet_endpoint = String::new();
206 let mut storage_class = String::new();
207
208 let list_buckets;
209
210 loop {
211 match reader.read_event(&mut buf) {
212 Ok(Event::Start(ref e)) => match e.name() {
213 b"Prefix" => prefix = reader.read_text(e.name(), &mut Vec::new())?,
214 b"Marker" => marker = reader.read_text(e.name(), &mut Vec::new())?,
215 b"MaxKeys" => max_keys = reader.read_text(e.name(), &mut Vec::new())?,
216 b"IsTruncated" => {
217 is_truncated = reader.read_text(e.name(), &mut Vec::new())? == "true"
218 }
219 b"NextMarker" => next_marker = reader.read_text(e.name(), &mut Vec::new())?,
220 b"ID" => id = reader.read_text(e.name(), &mut Vec::new())?,
221 b"DisplayName" => display_name = reader.read_text(e.name(), &mut Vec::new())?,
222
223 b"Bucket" => {
224 name = String::new();
225 location = String::new();
226 create_date = String::new();
227 extranet_endpoint = String::new();
228 intranet_endpoint = String::new();
229 storage_class = String::new();
230 }
231
232 b"Name" => name = reader.read_text(e.name(), &mut Vec::new())?,
233 b"CreationDate" => create_date = reader.read_text(e.name(), &mut Vec::new())?,
234 b"ExtranetEndpoint" => {
235 extranet_endpoint = reader.read_text(e.name(), &mut Vec::new())?
236 }
237 b"IntranetEndpoint" => {
238 intranet_endpoint = reader.read_text(e.name(), &mut Vec::new())?
239 }
240 b"Location" => location = reader.read_text(e.name(), &mut Vec::new())?,
241 b"StorageClass" => {
242 storage_class = reader.read_text(e.name(), &mut Vec::new())?
243 }
244 _ => (),
245 },
246 Ok(Event::End(ref e)) if e.name() == b"Bucket" => {
247 let bucket = Bucket::new(
248 name.clone(),
249 create_date.clone(),
250 location.clone(),
251 extranet_endpoint.clone(),
252 intranet_endpoint.clone(),
253 storage_class.clone(),
254 );
255 result.push(bucket);
256 }
257 Ok(Event::Eof) => {
258 list_buckets = ListBuckets::new(
259 prefix,
260 marker,
261 max_keys,
262 is_truncated,
263 next_marker,
264 id,
265 display_name,
266 result,
267 );
268 break;
269 } Err(e) => panic!("Error at position {}: {:?}", reader.buffer_position(), e),
271 _ => (), }
273 buf.clear();
274 }
275 Ok(list_buckets)
276 }
277
278 pub async fn get_object<S>(
279 &self,
280 object: S,
281 headers: Option<HashMap<S, S>>,
282 resources: Option<HashMap<S, Option<S>>>,
283 ) -> Result<Bytes, reqwest::Error>
284 where
285 S: AsRef<str>,
286 {
287 let object = object.as_ref();
288 let resources_str = if let Some(r) = resources {
289 self.get_resources_str(r)
290 } else {
291 String::new()
292 };
293 let host = self.host(self.bucket(), object, &resources_str);
294 let date = self.date();
295 let mut headers = if let Some(h) = headers {
296 to_headers(h).unwrap()
297 } else {
298 HeaderMap::new()
299 };
300 headers.insert(DATE, date.parse().unwrap());
301 let authorization = self.oss_sign(
302 "GET",
303 self.key_id(),
304 self.key_secret(),
305 self.bucket(),
306 object,
307 &resources_str,
308 &headers,
309 );
310 headers.insert("Authorization", authorization.parse().unwrap());
311
312 let res = reqwest::Client::new()
313 .get(&host)
314 .headers(headers)
315 .send()
316 .await?;
317 Ok(res.bytes().await?)
318 }
319
320 pub async fn head_object<S>(
321 &self,
322 object: S,
323 headers: Option<HashMap<S, S>>,
324 resources: Option<HashMap<S, Option<S>>>,
325 ) -> Result<HeaderMap, reqwest::Error>
326 where
327 S: AsRef<str>,
328 {
329 let object = object.as_ref();
330 let resources_str = if let Some(r) = resources {
331 self.get_resources_str(r)
332 } else {
333 String::new()
334 };
335 let host = self.host(self.bucket(), object, &resources_str);
336 let date = self.date();
337 let mut headers = if let Some(h) = headers {
338 to_headers(h).unwrap()
339 } else {
340 HeaderMap::new()
341 };
342 headers.insert(DATE, date.parse().unwrap());
343 let authorization = self.oss_sign(
344 "HEAD",
345 self.key_id(),
346 self.key_secret(),
347 self.bucket(),
348 object,
349 &resources_str,
350 &headers,
351 );
352 headers.insert("Authorization", authorization.parse().unwrap());
353
354 let res = reqwest::Client::new()
355 .head(&host)
356 .headers(headers)
357 .send()
358 .await?;
359 Ok(res.headers().clone())
360 }
361
362 pub async fn put_object_from_buffer<S1, S2, H, R>(
363 &self,
364 buf: &[u8],
365 object: S1,
366 headers: H,
367 resources: R,
368 ) -> Result<Bytes, reqwest::Error>
369 where
370 S1: AsRef<str>,
371 S2: AsRef<str>,
372 H: Into<Option<HashMap<S2, S2>>>,
373 R: Into<Option<HashMap<S2, Option<S2>>>>,
374 {
375 let object = object.as_ref();
376 let resources_str = if let Some(r) = resources.into() {
377 self.get_resources_str(r)
378 } else {
379 String::new()
380 };
381 let host = self.host(self.bucket(), object, &resources_str);
382 let date = self.date();
383
384 let mut headers = if let Some(h) = headers.into() {
385 to_headers(h).unwrap()
386 } else {
387 HeaderMap::new()
388 };
389 headers.insert(DATE, date.parse().unwrap());
390 let authorization = self.oss_sign(
391 "PUT",
392 self.key_id(),
393 self.key_secret(),
394 self.bucket(),
395 object,
396 &resources_str,
397 &headers,
398 );
399 headers.insert("Authorization", authorization.parse().unwrap());
400
401 let res = reqwest::Client::new()
402 .put(&host)
403 .headers(headers)
404 .body(buf.to_owned())
405 .send()
406 .await?;
407 Ok(res.bytes().await?)
408 }
409
410 pub async fn put_object_from_file<S1, S2, S3, H, R>(
411 &self,
412 file: S1,
413 object_name: S2,
414 headers: H,
415 resources: R,
416 ) -> Result<(), Error>
417 where
418 S1: AsRef<str>,
419 S2: AsRef<str>,
420 S3: AsRef<str>,
421 H: Into<Option<HashMap<S3, S3>>>,
422 R: Into<Option<HashMap<S3, Option<S3>>>>,
423 {
424 let object_name = object_name.as_ref();
425 let resources_str = if let Some(r) = resources.into() {
426 self.get_resources_str(r)
427 } else {
428 String::new()
429 };
430 let host = self.host(self.bucket(), object_name, &resources_str);
431 let date = self.date();
432 let buf = load_file(file)?;
433 let mut headers = if let Some(h) = headers.into() {
434 to_headers(h)?
435 } else {
436 HeaderMap::new()
437 };
438 headers.insert(DATE, date.parse()?);
439 headers.insert(CONTENT_LENGTH, buf.len().to_string().parse()?);
440 let authorization = self.oss_sign(
441 "PUT",
442 self.key_id(),
443 self.key_secret(),
444 self.bucket(),
445 object_name,
446 &resources_str,
447 &headers,
448 );
449 headers.insert("Authorization", authorization.parse()?);
450
451 let resp = self
452 .client
453 .put(&host)
454 .headers(headers)
455 .body(buf)
456 .send()
457 .await?;
458
459 if resp.status().is_success() {
460 Ok(())
461 } else {
462 Err(Error::Object(ObjectError::PutError {
463 msg: format!("can not put object, reason: {:?}", resp.text().await).into(),
464 }))
465 }
466 }
467
468 async fn initiate_multipart_upload<S2, S3, H>(
470 &self,
471 object_name: S2,
472 headers: H,
473 ) -> Result<String, Error>
474 where
475 S2: AsRef<str>,
476 S3: AsRef<str>,
477 H: Into<Option<HashMap<S3, S3>>>,
478 {
479 let object_name = object_name.as_ref();
480 let resources_str = "uploads";
481
482 let host = self.host(self.bucket(), object_name, resources_str);
483 let date = self.date();
484 let mut headers = if let Some(h) = headers.into() {
485 to_headers(h)?
486 } else {
487 HeaderMap::new()
488 };
489 headers.insert(DATE, date.parse()?);
490 let authorization = self.oss_sign(
491 "POST",
492 self.key_id(),
493 self.key_secret(),
494 self.bucket(),
495 object_name,
496 resources_str,
497 &headers,
498 );
499 headers.insert("Authorization", authorization.parse()?);
500
501 let resp = self.client.post(&host).headers(headers).send().await?;
502
503 if resp.status().is_success() {
504 #[derive(Debug, Serialize, Deserialize, PartialEq)]
505 struct InitiateMultipartUploadResult {
506 Bucket: String,
507 Key: String,
508 UploadId: String,
509 }
510
511 let init: InitiateMultipartUploadResult =
512 from_str(&resp.text().await.unwrap()).unwrap();
513 Ok(init.UploadId)
514 } else {
515 Err(Error::Object(ObjectError::PutError {
516 msg: format!("can not put object, reason: {:?}", resp.text().await).into(),
517 }))
518 }
519 }
520
521 async fn upload_part<S1, S2, S3, H>(
523 &self,
524 file: S1,
525 object_name: S2,
526 chunk: FileChunk,
527 upload_id: String,
528 headers: H,
529 ) -> Result<String, Error>
530 where
531 S1: AsRef<str>,
532 S2: AsRef<str>,
533 S3: AsRef<str>,
534 H: Into<Option<HashMap<S3, S3>>>,
535 {
536 let object_name = object_name.as_ref();
537 let resources_str = &format!("partNumber={}&uploadId={}", chunk.number, upload_id);
538
539 let host = self.host(self.bucket(), object_name, resources_str);
540 let date = self.date();
541 let mut headers = if let Some(h) = headers.into() {
542 to_headers(h)?
543 } else {
544 HeaderMap::new()
545 };
546 headers.insert(DATE, date.parse()?);
547
548 let authorization = self.oss_sign(
549 "PUT",
550 self.key_id(),
551 self.key_secret(),
552 self.bucket(),
553 object_name,
554 resources_str,
555 &headers,
556 );
557 headers.insert("Authorization", authorization.parse()?);
558
559 let buf = load_chunk_file(file, chunk.offset, chunk.size)?;
560 headers.insert(CONTENT_LENGTH, buf.len().to_string().parse()?);
561
562 let resp = self
563 .client
564 .put(&host)
565 .headers(headers)
566 .body(buf)
567 .send()
568 .await?;
569
570 if resp.status().is_success() {
571 let etag = resp.headers().get(ETAG).unwrap().to_str().unwrap();
572 Ok(etag.to_owned())
573 } else {
574 Err(Error::Object(ObjectError::PutError {
575 msg: format!("can not put object, reason: {:?}", resp.text().await).into(),
576 }))
577 }
578 }
579
580 async fn complete_multipart_upload<S1, S3, H>(
582 &self,
583 object_name: S1,
584 upload_id: String,
585 complete: CompleteMultipartUpload,
586 headers: H,
587 ) -> Result<(), Error>
588 where
589 S1: AsRef<str>,
590 S3: AsRef<str>,
591 H: Into<Option<HashMap<S3, S3>>>,
592 {
593 let object_name = object_name.as_ref();
594 let resources_str = &format!("uploadId={}", upload_id);
595
596 let host = self.host(self.bucket(), object_name, resources_str);
597 let buf = get_complete_str(complete);
598 let date = self.date();
599 let mut headers = if let Some(h) = headers.into() {
600 to_headers(h)?
601 } else {
602 HeaderMap::new()
603 };
604 headers.insert(DATE, date.parse()?);
605 let authorization = self.oss_sign(
606 "POST",
607 self.key_id(),
608 self.key_secret(),
609 self.bucket(),
610 object_name,
611 resources_str,
612 &headers,
613 );
614 headers.insert("Authorization", authorization.parse()?);
615 headers.insert(CONTENT_LENGTH, buf.len().to_string().parse()?);
616
617 let resp = self
618 .client
619 .post(&host)
620 .headers(headers)
621 .body(buf)
622 .send()
623 .await?;
624
625 if resp.status().is_success() {
626 Ok(())
627 } else {
628 Err(Error::Object(ObjectError::PutError {
629 msg: format!("can not put object, status code: {:?}", resp.text().await).into(),
630 }))
631 }
632 }
633
634 pub async fn chunk_upload_by_size<S1, H>(
636 &self,
637 object_name: S1,
638 file: S1,
639 chunk_size: u64,
640 headers: H,
641 ) -> Result<(), Error>
642 where
643 S1: AsRef<str>,
644 H: Into<Option<HashMap<S1, S1>>>,
645 {
646 let object_name = object_name.as_ref();
647 let file = file.as_ref();
648 let upload_id = self.initiate_multipart_upload(object_name, headers).await?;
650 let chunks = split_file_by_part_size(file, chunk_size).await.unwrap();
652 let mut parts = vec![];
654 for chunk in chunks {
655 let etag = self
656 .upload_part(
657 file,
658 object_name,
659 chunk.clone(),
660 upload_id.clone(),
661 None::<HashMap<&str, &str>>,
662 )
663 .await?;
664 parts.push(Part {
665 PartNumber: chunk.number,
666 ETag: etag,
667 });
668 }
669 self.complete_multipart_upload(
671 object_name,
672 upload_id,
673 CompleteMultipartUpload { Part: parts },
674 None::<HashMap<&str, &str>>,
675 )
676 .await
677 }
678
679 pub async fn delete_object<S>(&self, object_name: S) -> Result<(), Error>
680 where
681 S: AsRef<str>,
682 {
683 let object_name = object_name.as_ref();
684 let host = self.host(self.bucket(), object_name, "");
685 let date = self.date();
686
687 let mut headers = HeaderMap::new();
688 headers.insert(DATE, date.parse()?);
689 let authorization = self.oss_sign(
690 "DELETE",
691 self.key_id(),
692 self.key_secret(),
693 self.bucket(),
694 object_name,
695 "",
696 &headers,
697 );
698 headers.insert("Authorization", authorization.parse()?);
699
700 let resp = self.client.delete(&host).headers(headers).send().await?;
701
702 if resp.status().is_success() {
703 Ok(())
704 } else {
705 Err(Error::Object(ObjectError::DeleteError {
706 msg: format!("can not delete object, reason: {:?}", resp.text().await).into(),
707 }))
708 }
709 }
710}
711
712#[derive(Debug, Deserialize, PartialEq, Serialize)]
720pub struct CompleteMultipartUpload {
721 Part: Vec<Part>,
722}
723
724#[derive(Debug, Deserialize, PartialEq, Serialize)]
725pub struct Part {
726 PartNumber: u64,
727 ETag: String,
728}
729
730fn get_complete_str(complete: CompleteMultipartUpload) -> String {
731 let mut str = String::from("<CompleteMultipartUpload>");
732 for p in complete.Part {
733 str.push_str(&to_string(&p).unwrap());
734 }
735 str.push_str("</CompleteMultipartUpload>");
736 str
737}
738
739#[cfg(test)]
740mod tests {
741 use super::*;
742
743 #[test]
744 fn test_get_complete_str() {
747 let complete = CompleteMultipartUpload {
748 Part: vec![
749 Part {
750 PartNumber: 2,
751 ETag: r#""test""#.to_string(),
752 },
753 Part {
754 PartNumber: 2,
755 ETag: r#""123""#.to_string(),
756 },
757 ],
758 };
759
760 let str = get_complete_str(complete);
761 assert_eq!(str, "<CompleteMultipartUpload><Part><PartNumber>2</PartNumber><ETag>\"test\"</ETag></Part><Part><PartNumber>2</PartNumber><ETag>\"123\"</ETag></Part></CompleteMultipartUpload>");
762 }
763
764 fn get_oss_instance() -> OSS {
765 let oss_instance = OSS::new(
766 "xxx".to_string(),
767 "xxx".to_string(),
768 "xxx.aliyuncs.com".to_string(),
769 "xxx".to_string(),
770 );
771 oss_instance
772 }
773
774 #[tokio::test]
775 async fn test_oss() {
776 let oss_instance = get_oss_instance();
777 put_object(&oss_instance).await;
778 get_object(&oss_instance).await;
779 delete_object(&oss_instance).await;
780 }
781
782 #[tokio::test]
783 async fn test_oss_multi_upload() {
784 let oss_instance = get_oss_instance();
785 let object_name = "object_name";
786 let file = "/tmp/tmp.txt";
787 let chunk_size = 102400;
788
789 let res = oss_instance
790 .chunk_upload_by_size(object_name, file, chunk_size, None::<HashMap<&str, &str>>)
791 .await;
792 println!("res: {:?}", res);
793 assert!(res.is_ok());
794 }
795
796 async fn put_object(oss_instance: &OSS) {
797 let result = oss_instance
798 .put_object_from_file(
799 "/xxxxx/Cargo.toml",
800 "objectName",
801 None::<HashMap<&str, &str>>,
802 None,
803 )
804 .await;
805 assert_eq!(result.is_ok(), true);
806 }
807
808 async fn get_object(oss_instance: &OSS) {
809 let result = oss_instance
810 .get_object("objectName", None::<HashMap<&str, &str>>, None)
811 .await;
812 assert_eq!(result.is_ok(), true);
813 println!("text = {:?}", String::from_utf8(result.unwrap().to_vec()));
814 }
815
816 async fn delete_object(oss_instance: &OSS) {
817 let result = oss_instance.delete_object("objectName").await;
818 assert_eq!(result.is_ok(), true);
819 }
820}