oss_sdk/
oss.rs

1use super::errors::Error;
2use bytes::Bytes;
3use chrono::prelude::*;
4use quick_xml::{events::Event, Reader};
5use reqwest::header::{HeaderMap, CONTENT_LENGTH, DATE, ETAG};
6use reqwest::Client;
7use serde_derive::{Deserialize, Serialize};
8use serde_xml_rs::{from_str, to_string};
9use std::collections::HashMap;
10use std::str;
11
12use crate::bucket::{Bucket, ListBuckets};
13use crate::errors::ObjectError;
14
15use super::auth::*;
16use super::utils::*;
17
18#[derive(Clone, Debug)]
19pub struct OSS {
20    key_id: String,
21    key_secret: String,
22    endpoint: String,
23    bucket: String,
24    pub client: Client,
25}
26
27const RESOURCES: [&str; 50] = [
28    "acl",
29    "uploads",
30    "location",
31    "cors",
32    "logging",
33    "website",
34    "referer",
35    "lifecycle",
36    "delete",
37    "append",
38    "tagging",
39    "objectMeta",
40    "uploadId",
41    "partNumber",
42    "security-token",
43    "position",
44    "img",
45    "style",
46    "styleName",
47    "replication",
48    "replicationProgress",
49    "replicationLocation",
50    "cname",
51    "bucketInfo",
52    "comp",
53    "qos",
54    "live",
55    "status",
56    "vod",
57    "startTime",
58    "endTime",
59    "symlink",
60    "x-oss-process",
61    "response-content-type",
62    "response-content-language",
63    "response-expires",
64    "response-cache-control",
65    "response-content-disposition",
66    "response-content-encoding",
67    "udf",
68    "udfName",
69    "udfImage",
70    "udfId",
71    "udfImageDesc",
72    "udfApplication",
73    "comp",
74    "udfApplicationLog",
75    "restore",
76    "callback",
77    "callback-var",
78];
79
80impl OSS {
81    pub fn new(key_id: String, key_secret: String, endpoint: String, bucket: String) -> Self {
82        OSS {
83            key_id: key_id,
84            key_secret: key_secret,
85            endpoint: endpoint,
86            bucket: bucket,
87            client: reqwest::Client::new(),
88        }
89    }
90
91    pub fn bucket(&self) -> &str {
92        &self.bucket
93    }
94
95    pub fn endpoint(&self) -> &str {
96        &self.endpoint
97    }
98
99    pub fn key_id(&self) -> &str {
100        &self.key_id
101    }
102
103    pub fn key_secret(&self) -> &str {
104        &self.key_secret
105    }
106
107    pub fn set_bucket(&mut self, bucket: &str) {
108        self.bucket = bucket.to_string()
109    }
110
111    pub fn host(&self, bucket: &str, object: &str, resources_str: &str) -> String {
112        if self.endpoint.starts_with("https") {
113            format!(
114                "https://{}.{}/{}?{}",
115                bucket,
116                self.endpoint.replacen("https://", "", 1),
117                object,
118                resources_str
119            )
120        } else {
121            format!(
122                "http://{}.{}/{}?{}",
123                bucket,
124                self.endpoint.replacen("http://", "", 1),
125                object,
126                resources_str
127            )
128        }
129    }
130
131    pub fn date(&self) -> String {
132        let now: DateTime<Utc> = Utc::now();
133        now.format("%a, %d %b %Y %T GMT").to_string()
134    }
135
136    pub fn get_resources_str<S>(&self, params: HashMap<S, Option<S>>) -> String
137    where
138        S: AsRef<str>,
139    {
140        let mut resources: Vec<(&S, &Option<S>)> = params
141            .iter()
142            .filter(|(k, _)| RESOURCES.contains(&k.as_ref()))
143            .collect();
144        resources.sort_by(|a, b| a.0.as_ref().to_string().cmp(&b.0.as_ref().to_string()));
145        let mut result = String::new();
146        for (k, v) in resources {
147            if !result.is_empty() {
148                result += "&";
149            }
150            if let Some(vv) = v {
151                result += &format!("{}={}", k.as_ref().to_owned(), vv.as_ref());
152            } else {
153                result += k.as_ref();
154            }
155        }
156        result
157    }
158
159    pub async fn list_bucket<S, R>(&self, resources: R) -> Result<ListBuckets, Error>
160    where
161        S: AsRef<str>,
162        R: Into<Option<HashMap<S, Option<S>>>>,
163    {
164        let resources_str = if let Some(r) = resources.into() {
165            self.get_resources_str(r)
166        } else {
167            String::new()
168        };
169        let host = self.endpoint();
170        let date = self.date();
171
172        let mut headers = HeaderMap::new();
173        headers.insert(DATE, date.parse()?);
174        let authorization = self.oss_sign(
175            "GET",
176            self.key_id(),
177            self.key_secret(),
178            "",
179            "",
180            &resources_str,
181            &headers,
182        );
183        headers.insert("Authorization", authorization.parse()?);
184
185        let resp = self.client.get(host).headers(headers).send().await?;
186
187        let xml_str = resp.text().await?;
188        let mut result = Vec::new();
189        let mut reader = Reader::from_str(xml_str.as_str());
190        reader.trim_text(true);
191        let mut buf = Vec::new();
192
193        let mut prefix = String::new();
194        let mut marker = String::new();
195        let mut max_keys = String::new();
196        let mut is_truncated = false;
197        let mut next_marker = String::new();
198        let mut id = String::new();
199        let mut display_name = String::new();
200
201        let mut name = String::new();
202        let mut location = String::new();
203        let mut create_date = String::new();
204        let mut extranet_endpoint = String::new();
205        let mut intranet_endpoint = String::new();
206        let mut storage_class = String::new();
207
208        let list_buckets;
209
210        loop {
211            match reader.read_event(&mut buf) {
212                Ok(Event::Start(ref e)) => match e.name() {
213                    b"Prefix" => prefix = reader.read_text(e.name(), &mut Vec::new())?,
214                    b"Marker" => marker = reader.read_text(e.name(), &mut Vec::new())?,
215                    b"MaxKeys" => max_keys = reader.read_text(e.name(), &mut Vec::new())?,
216                    b"IsTruncated" => {
217                        is_truncated = reader.read_text(e.name(), &mut Vec::new())? == "true"
218                    }
219                    b"NextMarker" => next_marker = reader.read_text(e.name(), &mut Vec::new())?,
220                    b"ID" => id = reader.read_text(e.name(), &mut Vec::new())?,
221                    b"DisplayName" => display_name = reader.read_text(e.name(), &mut Vec::new())?,
222
223                    b"Bucket" => {
224                        name = String::new();
225                        location = String::new();
226                        create_date = String::new();
227                        extranet_endpoint = String::new();
228                        intranet_endpoint = String::new();
229                        storage_class = String::new();
230                    }
231
232                    b"Name" => name = reader.read_text(e.name(), &mut Vec::new())?,
233                    b"CreationDate" => create_date = reader.read_text(e.name(), &mut Vec::new())?,
234                    b"ExtranetEndpoint" => {
235                        extranet_endpoint = reader.read_text(e.name(), &mut Vec::new())?
236                    }
237                    b"IntranetEndpoint" => {
238                        intranet_endpoint = reader.read_text(e.name(), &mut Vec::new())?
239                    }
240                    b"Location" => location = reader.read_text(e.name(), &mut Vec::new())?,
241                    b"StorageClass" => {
242                        storage_class = reader.read_text(e.name(), &mut Vec::new())?
243                    }
244                    _ => (),
245                },
246                Ok(Event::End(ref e)) if e.name() == b"Bucket" => {
247                    let bucket = Bucket::new(
248                        name.clone(),
249                        create_date.clone(),
250                        location.clone(),
251                        extranet_endpoint.clone(),
252                        intranet_endpoint.clone(),
253                        storage_class.clone(),
254                    );
255                    result.push(bucket);
256                }
257                Ok(Event::Eof) => {
258                    list_buckets = ListBuckets::new(
259                        prefix,
260                        marker,
261                        max_keys,
262                        is_truncated,
263                        next_marker,
264                        id,
265                        display_name,
266                        result,
267                    );
268                    break;
269                } // exits the loop when reaching end of file
270                Err(e) => panic!("Error at position {}: {:?}", reader.buffer_position(), e),
271                _ => (), // There are several other `Event`s we do not consider here
272            }
273            buf.clear();
274        }
275        Ok(list_buckets)
276    }
277
278    pub async fn get_object<S>(
279        &self,
280        object: S,
281        headers: Option<HashMap<S, S>>,
282        resources: Option<HashMap<S, Option<S>>>,
283    ) -> Result<Bytes, reqwest::Error>
284    where
285        S: AsRef<str>,
286    {
287        let object = object.as_ref();
288        let resources_str = if let Some(r) = resources {
289            self.get_resources_str(r)
290        } else {
291            String::new()
292        };
293        let host = self.host(self.bucket(), object, &resources_str);
294        let date = self.date();
295        let mut headers = if let Some(h) = headers {
296            to_headers(h).unwrap()
297        } else {
298            HeaderMap::new()
299        };
300        headers.insert(DATE, date.parse().unwrap());
301        let authorization = self.oss_sign(
302            "GET",
303            self.key_id(),
304            self.key_secret(),
305            self.bucket(),
306            object,
307            &resources_str,
308            &headers,
309        );
310        headers.insert("Authorization", authorization.parse().unwrap());
311
312        let res = reqwest::Client::new()
313            .get(&host)
314            .headers(headers)
315            .send()
316            .await?;
317        Ok(res.bytes().await?)
318    }
319
320    pub async fn head_object<S>(
321        &self,
322        object: S,
323        headers: Option<HashMap<S, S>>,
324        resources: Option<HashMap<S, Option<S>>>,
325    ) -> Result<HeaderMap, reqwest::Error>
326    where
327        S: AsRef<str>,
328    {
329        let object = object.as_ref();
330        let resources_str = if let Some(r) = resources {
331            self.get_resources_str(r)
332        } else {
333            String::new()
334        };
335        let host = self.host(self.bucket(), object, &resources_str);
336        let date = self.date();
337        let mut headers = if let Some(h) = headers {
338            to_headers(h).unwrap()
339        } else {
340            HeaderMap::new()
341        };
342        headers.insert(DATE, date.parse().unwrap());
343        let authorization = self.oss_sign(
344            "HEAD",
345            self.key_id(),
346            self.key_secret(),
347            self.bucket(),
348            object,
349            &resources_str,
350            &headers,
351        );
352        headers.insert("Authorization", authorization.parse().unwrap());
353
354        let res = reqwest::Client::new()
355            .head(&host)
356            .headers(headers)
357            .send()
358            .await?;
359        Ok(res.headers().clone())
360    }
361
362    pub async fn put_object_from_buffer<S1, S2, H, R>(
363        &self,
364        buf: &[u8],
365        object: S1,
366        headers: H,
367        resources: R,
368    ) -> Result<Bytes, reqwest::Error>
369    where
370        S1: AsRef<str>,
371        S2: AsRef<str>,
372        H: Into<Option<HashMap<S2, S2>>>,
373        R: Into<Option<HashMap<S2, Option<S2>>>>,
374    {
375        let object = object.as_ref();
376        let resources_str = if let Some(r) = resources.into() {
377            self.get_resources_str(r)
378        } else {
379            String::new()
380        };
381        let host = self.host(self.bucket(), object, &resources_str);
382        let date = self.date();
383
384        let mut headers = if let Some(h) = headers.into() {
385            to_headers(h).unwrap()
386        } else {
387            HeaderMap::new()
388        };
389        headers.insert(DATE, date.parse().unwrap());
390        let authorization = self.oss_sign(
391            "PUT",
392            self.key_id(),
393            self.key_secret(),
394            self.bucket(),
395            object,
396            &resources_str,
397            &headers,
398        );
399        headers.insert("Authorization", authorization.parse().unwrap());
400
401        let res = reqwest::Client::new()
402            .put(&host)
403            .headers(headers)
404            .body(buf.to_owned())
405            .send()
406            .await?;
407        Ok(res.bytes().await?)
408    }
409
410    pub async fn put_object_from_file<S1, S2, S3, H, R>(
411        &self,
412        file: S1,
413        object_name: S2,
414        headers: H,
415        resources: R,
416    ) -> Result<(), Error>
417    where
418        S1: AsRef<str>,
419        S2: AsRef<str>,
420        S3: AsRef<str>,
421        H: Into<Option<HashMap<S3, S3>>>,
422        R: Into<Option<HashMap<S3, Option<S3>>>>,
423    {
424        let object_name = object_name.as_ref();
425        let resources_str = if let Some(r) = resources.into() {
426            self.get_resources_str(r)
427        } else {
428            String::new()
429        };
430        let host = self.host(self.bucket(), object_name, &resources_str);
431        let date = self.date();
432        let buf = load_file(file)?;
433        let mut headers = if let Some(h) = headers.into() {
434            to_headers(h)?
435        } else {
436            HeaderMap::new()
437        };
438        headers.insert(DATE, date.parse()?);
439        headers.insert(CONTENT_LENGTH, buf.len().to_string().parse()?);
440        let authorization = self.oss_sign(
441            "PUT",
442            self.key_id(),
443            self.key_secret(),
444            self.bucket(),
445            object_name,
446            &resources_str,
447            &headers,
448        );
449        headers.insert("Authorization", authorization.parse()?);
450
451        let resp = self
452            .client
453            .put(&host)
454            .headers(headers)
455            .body(buf)
456            .send()
457            .await?;
458
459        if resp.status().is_success() {
460            Ok(())
461        } else {
462            Err(Error::Object(ObjectError::PutError {
463                msg: format!("can not put object, reason: {:?}", resp.text().await).into(),
464            }))
465        }
466    }
467
468    // https://help.aliyun.com/document_detail/31992.html
469    async fn initiate_multipart_upload<S2, S3, H>(
470        &self,
471        object_name: S2,
472        headers: H,
473    ) -> Result<String, Error>
474    where
475        S2: AsRef<str>,
476        S3: AsRef<str>,
477        H: Into<Option<HashMap<S3, S3>>>,
478    {
479        let object_name = object_name.as_ref();
480        let resources_str = "uploads";
481
482        let host = self.host(self.bucket(), object_name, resources_str);
483        let date = self.date();
484        let mut headers = if let Some(h) = headers.into() {
485            to_headers(h)?
486        } else {
487            HeaderMap::new()
488        };
489        headers.insert(DATE, date.parse()?);
490        let authorization = self.oss_sign(
491            "POST",
492            self.key_id(),
493            self.key_secret(),
494            self.bucket(),
495            object_name,
496            resources_str,
497            &headers,
498        );
499        headers.insert("Authorization", authorization.parse()?);
500
501        let resp = self.client.post(&host).headers(headers).send().await?;
502
503        if resp.status().is_success() {
504            #[derive(Debug, Serialize, Deserialize, PartialEq)]
505            struct InitiateMultipartUploadResult {
506                Bucket: String,
507                Key: String,
508                UploadId: String,
509            }
510
511            let init: InitiateMultipartUploadResult =
512                from_str(&resp.text().await.unwrap()).unwrap();
513            Ok(init.UploadId)
514        } else {
515            Err(Error::Object(ObjectError::PutError {
516                msg: format!("can not put object, reason: {:?}", resp.text().await).into(),
517            }))
518        }
519    }
520
521    // https://help.aliyun.com/document_detail/31993.html
522    async fn upload_part<S1, S2, S3, H>(
523        &self,
524        file: S1,
525        object_name: S2,
526        chunk: FileChunk,
527        upload_id: String,
528        headers: H,
529    ) -> Result<String, Error>
530    where
531        S1: AsRef<str>,
532        S2: AsRef<str>,
533        S3: AsRef<str>,
534        H: Into<Option<HashMap<S3, S3>>>,
535    {
536        let object_name = object_name.as_ref();
537        let resources_str = &format!("partNumber={}&uploadId={}", chunk.number, upload_id);
538
539        let host = self.host(self.bucket(), object_name, resources_str);
540        let date = self.date();
541        let mut headers = if let Some(h) = headers.into() {
542            to_headers(h)?
543        } else {
544            HeaderMap::new()
545        };
546        headers.insert(DATE, date.parse()?);
547
548        let authorization = self.oss_sign(
549            "PUT",
550            self.key_id(),
551            self.key_secret(),
552            self.bucket(),
553            object_name,
554            resources_str,
555            &headers,
556        );
557        headers.insert("Authorization", authorization.parse()?);
558
559        let buf = load_chunk_file(file, chunk.offset, chunk.size)?;
560        headers.insert(CONTENT_LENGTH, buf.len().to_string().parse()?);
561
562        let resp = self
563            .client
564            .put(&host)
565            .headers(headers)
566            .body(buf)
567            .send()
568            .await?;
569
570        if resp.status().is_success() {
571            let etag = resp.headers().get(ETAG).unwrap().to_str().unwrap();
572            Ok(etag.to_owned())
573        } else {
574            Err(Error::Object(ObjectError::PutError {
575                msg: format!("can not put object, reason: {:?}", resp.text().await).into(),
576            }))
577        }
578    }
579
580    // https://help.aliyun.com/document_detail/31993.html
581    async fn complete_multipart_upload<S1, S3, H>(
582        &self,
583        object_name: S1,
584        upload_id: String,
585        complete: CompleteMultipartUpload,
586        headers: H,
587    ) -> Result<(), Error>
588    where
589        S1: AsRef<str>,
590        S3: AsRef<str>,
591        H: Into<Option<HashMap<S3, S3>>>,
592    {
593        let object_name = object_name.as_ref();
594        let resources_str = &format!("uploadId={}", upload_id);
595
596        let host = self.host(self.bucket(), object_name, resources_str);
597        let buf = get_complete_str(complete);
598        let date = self.date();
599        let mut headers = if let Some(h) = headers.into() {
600            to_headers(h)?
601        } else {
602            HeaderMap::new()
603        };
604        headers.insert(DATE, date.parse()?);
605        let authorization = self.oss_sign(
606            "POST",
607            self.key_id(),
608            self.key_secret(),
609            self.bucket(),
610            object_name,
611            resources_str,
612            &headers,
613        );
614        headers.insert("Authorization", authorization.parse()?);
615        headers.insert(CONTENT_LENGTH, buf.len().to_string().parse()?);
616
617        let resp = self
618            .client
619            .post(&host)
620            .headers(headers)
621            .body(buf)
622            .send()
623            .await?;
624
625        if resp.status().is_success() {
626            Ok(())
627        } else {
628            Err(Error::Object(ObjectError::PutError {
629                msg: format!("can not put object, status code: {:?}", resp.text().await).into(),
630            }))
631        }
632    }
633
634    // <MinSizeAllowed>102400</MinSizeAllowed>
635    pub async fn chunk_upload_by_size<S1, H>(
636        &self,
637        object_name: S1,
638        file: S1,
639        chunk_size: u64,
640        headers: H,
641    ) -> Result<(), Error>
642    where
643        S1: AsRef<str>,
644        H: Into<Option<HashMap<S1, S1>>>,
645    {
646        let object_name = object_name.as_ref();
647        let file = file.as_ref();
648        // init multi upload
649        let upload_id = self.initiate_multipart_upload(object_name, headers).await?;
650        // chunk object
651        let chunks = split_file_by_part_size(file, chunk_size).await.unwrap();
652        // part upload
653        let mut parts = vec![];
654        for chunk in chunks {
655            let etag = self
656                .upload_part(
657                    file,
658                    object_name,
659                    chunk.clone(),
660                    upload_id.clone(),
661                    None::<HashMap<&str, &str>>,
662                )
663                .await?;
664            parts.push(Part {
665                PartNumber: chunk.number,
666                ETag: etag,
667            });
668        }
669        // complete multi upload
670        self.complete_multipart_upload(
671            object_name,
672            upload_id,
673            CompleteMultipartUpload { Part: parts },
674            None::<HashMap<&str, &str>>,
675        )
676        .await
677    }
678
679    pub async fn delete_object<S>(&self, object_name: S) -> Result<(), Error>
680    where
681        S: AsRef<str>,
682    {
683        let object_name = object_name.as_ref();
684        let host = self.host(self.bucket(), object_name, "");
685        let date = self.date();
686
687        let mut headers = HeaderMap::new();
688        headers.insert(DATE, date.parse()?);
689        let authorization = self.oss_sign(
690            "DELETE",
691            self.key_id(),
692            self.key_secret(),
693            self.bucket(),
694            object_name,
695            "",
696            &headers,
697        );
698        headers.insert("Authorization", authorization.parse()?);
699
700        let resp = self.client.delete(&host).headers(headers).send().await?;
701
702        if resp.status().is_success() {
703            Ok(())
704        } else {
705            Err(Error::Object(ObjectError::DeleteError {
706                msg: format!("can not delete object, reason: {:?}", resp.text().await).into(),
707            }))
708        }
709    }
710}
711
712// <CompleteMultipartUpload>
713// <Part>
714// <PartNumber>PartNumber</PartNumber>
715// <ETag>ETag</ETag>
716// </Part>
717// ...
718// </CompleteMultipartUpload>
719#[derive(Debug, Deserialize, PartialEq, Serialize)]
720pub struct CompleteMultipartUpload {
721    Part: Vec<Part>,
722}
723
724#[derive(Debug, Deserialize, PartialEq, Serialize)]
725pub struct Part {
726    PartNumber: u64,
727    ETag: String,
728}
729
730fn get_complete_str(complete: CompleteMultipartUpload) -> String {
731    let mut str = String::from("<CompleteMultipartUpload>");
732    for p in complete.Part {
733        str.push_str(&to_string(&p).unwrap());
734    }
735    str.push_str("</CompleteMultipartUpload>");
736    str
737}
738
739#[cfg(test)]
740mod tests {
741    use super::*;
742
743    #[test]
744    // https://github.com/RReverser/serde-xml-rs
745    // waiting for the serde-xml-rs to fix the serde vector bug
746    fn test_get_complete_str() {
747        let complete = CompleteMultipartUpload {
748            Part: vec![
749                Part {
750                    PartNumber: 2,
751                    ETag: r#""test""#.to_string(),
752                },
753                Part {
754                    PartNumber: 2,
755                    ETag: r#""123""#.to_string(),
756                },
757            ],
758        };
759
760        let str = get_complete_str(complete);
761        assert_eq!(str, "<CompleteMultipartUpload><Part><PartNumber>2</PartNumber><ETag>\"test\"</ETag></Part><Part><PartNumber>2</PartNumber><ETag>\"123\"</ETag></Part></CompleteMultipartUpload>");
762    }
763
764    fn get_oss_instance() -> OSS {
765        let oss_instance = OSS::new(
766            "xxx".to_string(),
767            "xxx".to_string(),
768            "xxx.aliyuncs.com".to_string(),
769            "xxx".to_string(),
770        );
771        oss_instance
772    }
773
774    #[tokio::test]
775    async fn test_oss() {
776        let oss_instance = get_oss_instance();
777        put_object(&oss_instance).await;
778        get_object(&oss_instance).await;
779        delete_object(&oss_instance).await;
780    }
781
782    #[tokio::test]
783    async fn test_oss_multi_upload() {
784        let oss_instance = get_oss_instance();
785        let object_name = "object_name";
786        let file = "/tmp/tmp.txt";
787        let chunk_size = 102400;
788
789        let res = oss_instance
790            .chunk_upload_by_size(object_name, file, chunk_size, None::<HashMap<&str, &str>>)
791            .await;
792        println!("res: {:?}", res);
793        assert!(res.is_ok());
794    }
795
796    async fn put_object(oss_instance: &OSS) {
797        let result = oss_instance
798            .put_object_from_file(
799                "/xxxxx/Cargo.toml",
800                "objectName",
801                None::<HashMap<&str, &str>>,
802                None,
803            )
804            .await;
805        assert_eq!(result.is_ok(), true);
806    }
807
808    async fn get_object(oss_instance: &OSS) {
809        let result = oss_instance
810            .get_object("objectName", None::<HashMap<&str, &str>>, None)
811            .await;
812        assert_eq!(result.is_ok(), true);
813        println!("text = {:?}", String::from_utf8(result.unwrap().to_vec()));
814    }
815
816    async fn delete_object(oss_instance: &OSS) {
817        let result = oss_instance.delete_object("objectName").await;
818        assert_eq!(result.is_ok(), true);
819    }
820}