oss_rust_sdk/
async_object.rs

1use std::collections::HashMap;
2
3use crate::{
4    multi_part::{CompleteMultipartUploadResult, InitiateMultipartUploadResult},
5    oss::{ObjectMeta, RequestType},
6    prelude::{ListObjects, OSS},
7};
8
9use super::errors::{Error, ObjectError};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13
14#[async_trait]
15pub trait AsyncObjectAPI {
16    async fn list_object<S, H, R>(&self, headers: H, resources: R) -> Result<ListObjects, Error>
17    where
18        S: AsRef<str>,
19        H: Into<Option<HashMap<S, S>>> + Send,
20        R: Into<Option<HashMap<S, Option<S>>>> + Send;
21
22    async fn get_object<S1, S2, H, R>(
23        &self,
24        object_name: S1,
25        headers: H,
26        resources: R,
27    ) -> Result<Bytes, Error>
28    where
29        S1: AsRef<str> + Send,
30        S2: AsRef<str> + Send,
31        H: Into<Option<HashMap<S2, S2>>> + Send,
32        R: Into<Option<HashMap<S2, Option<S2>>>> + Send;
33
34    async fn put_object<S1, S2, H, R>(
35        &self,
36        buf: &[u8],
37        object_name: S1,
38        headers: H,
39        resources: R,
40    ) -> Result<(), Error>
41    where
42        S1: AsRef<str> + Send,
43        S2: AsRef<str> + Send,
44        H: Into<Option<HashMap<S2, S2>>> + Send,
45        R: Into<Option<HashMap<S2, Option<S2>>>> + Send;
46
47    async fn copy_object_from_object<S1, S2, S3, H, R>(
48        &self,
49        src: S1,
50        dest: S2,
51        headers: H,
52        resources: R,
53    ) -> Result<(), Error>
54    where
55        S1: AsRef<str> + Send,
56        S2: AsRef<str> + Send,
57        S3: AsRef<str> + Send,
58        H: Into<Option<HashMap<S3, S3>>> + Send,
59        R: Into<Option<HashMap<S3, Option<S3>>>> + Send;
60
61    async fn delete_object<S>(&self, object_name: S) -> Result<(), Error>
62    where
63        S: AsRef<str> + Send;
64
65    async fn head_object<S>(&self, object_name: S) -> Result<ObjectMeta, Error>
66    where
67        S: AsRef<str> + Send;
68
69    /// Notify oss to init a Multipart Upload event
70    async fn init_multi<S1, S2, H, R>(
71        &self,
72        object_name: S1,
73        headers: H,
74        resources: R,
75    ) -> Result<InitiateMultipartUploadResult, Error>
76    where
77        S1: AsRef<str> + Send,
78        S2: AsRef<str> + Send,
79        H: Into<Option<HashMap<S2, S2>>> + Send,
80        R: Into<Option<HashMap<S2, Option<S2>>>> + Send;
81
82    /// Upload data in chunks according to the specified Object name and uploadId
83    async fn upload_part<S1, S2, H, R>(
84        &self,
85        buf: &[u8],
86        object_name: S1,
87        headers: H,
88        resources: R,
89    ) -> Result<String, Error>
90    where
91        S1: AsRef<str> + Send,
92        S2: AsRef<str> + Send,
93        H: Into<Option<HashMap<S2, S2>>> + Send,
94        R: Into<Option<HashMap<S2, Option<S2>>>> + Send;
95
96    /// Complete the multipart upload of the entire file
97    ///
98    /// body format
99    /// <CompleteMultipartUpload>
100    /// <Part>
101    /// <PartNumber>PartNumber</PartNumber>
102    /// <ETag>ETag</ETag>
103    /// </Part>
104    /// ...
105    /// </CompleteMultipartUpload>
106    ///
107    /// # Examples
108    ///
109    ///  #[derive(Debug, Default, Serialize, Deserialize, PartialEq)]
110    ///  #[serde(rename_all = "PascalCase")]
111    ///  pub struct PartWrapper {
112    ///      pub part: Vec<Part>,
113    ///  }
114    ///
115    ///  #[derive(Debug, Serialize, Deserialize, PartialEq)]
116    ///  #[serde(rename_all = "PascalCase")]
117    ///  pub struct Part {
118    ///      part_number: usize,
119    ///      e_tag: String,
120    ///  }
121    ///
122    ///  let parts = CompleteDTO {
123    ///      part: vec![Part {
124    ///          part_number: 1,
125    ///          e_tag: "50BE5FACC702C5B945588031C6*****".to_string(),
126    ///      }],
127    ///  };
128    ///
129    ///  let body = quick_xml::se::to_string_with_root("CompleteMultipartUpload", &parts).unwrap();
130    ///
131    async fn complete_multi<S1, S2, H, R>(
132        &self,
133        body: String,
134        object_name: S1,
135        headers: H,
136        resources: R,
137    ) -> Result<CompleteMultipartUploadResult, Error>
138    where
139        S1: AsRef<str> + Send,
140        S2: AsRef<str> + Send,
141        H: Into<Option<HashMap<S2, S2>>> + Send,
142        R: Into<Option<HashMap<S2, Option<S2>>>> + Send;
143
144    /// Cancel the MultipartUpload event and delete the corresponding Part data
145    async fn abort_multi<S1, S2, H, R>(
146        &self,
147        object_name: S1,
148        headers: H,
149        resources: R,
150    ) -> Result<(), Error>
151    where
152        S1: AsRef<str> + Send,
153        S2: AsRef<str> + Send,
154        H: Into<Option<HashMap<S2, S2>>> + Send,
155        R: Into<Option<HashMap<S2, Option<S2>>>> + Send;
156}
157
158#[async_trait]
159impl<'a> AsyncObjectAPI for OSS<'a> {
160    async fn list_object<S, H, R>(&self, headers: H, resources: R) -> Result<ListObjects, Error>
161    where
162        S: AsRef<str>,
163        H: Into<Option<HashMap<S, S>>> + Send,
164        R: Into<Option<HashMap<S, Option<S>>>> + Send,
165    {
166        let (host, headers) =
167            self.build_request(RequestType::Get, String::new(), headers, resources)?;
168
169        let resp = self.http_client.get(host).headers(headers).send().await?;
170        let body = resp.text().await?;
171        let list_objects = quick_xml::de::from_str::<ListObjects>(&body)?;
172
173        Ok(list_objects)
174    }
175
176    async fn get_object<S1, S2, H, R>(
177        &self,
178        object_name: S1,
179        headers: H,
180        resources: R,
181    ) -> Result<Bytes, Error>
182    where
183        S1: AsRef<str> + Send,
184        S2: AsRef<str> + Send,
185        H: Into<Option<HashMap<S2, S2>>> + Send,
186        R: Into<Option<HashMap<S2, Option<S2>>>> + Send,
187    {
188        let (host, headers) =
189            self.build_request(RequestType::Get, object_name, headers, resources)?;
190
191        let resp = self.http_client.get(&host).headers(headers).send().await?;
192
193        if resp.status().is_success() {
194            Ok(resp.bytes().await?)
195        } else {
196            Err(Error::Object(ObjectError::GetError {
197                msg: format!("can not get object, status code: {}", resp.status()).into(),
198            }))
199        }
200    }
201
202    async fn put_object<S1, S2, H, R>(
203        &self,
204        buf: &[u8],
205        object_name: S1,
206        headers: H,
207        resources: R,
208    ) -> Result<(), Error>
209    where
210        S1: AsRef<str> + Send,
211        S2: AsRef<str> + Send,
212        H: Into<Option<HashMap<S2, S2>>> + Send,
213        R: Into<Option<HashMap<S2, Option<S2>>>> + Send,
214    {
215        let (host, headers) =
216            self.build_request(RequestType::Put, object_name, headers, resources)?;
217
218        let resp = self
219            .http_client
220            .put(&host)
221            .headers(headers)
222            .body(buf.to_owned())
223            .send()
224            .await?;
225
226        if resp.status().is_success() {
227            Ok(())
228        } else {
229            Err(Error::Object(ObjectError::DeleteError {
230                msg: format!(
231                    "can not put object, status code, status code: {}",
232                    resp.status()
233                )
234                .into(),
235            }))
236        }
237    }
238
239    async fn copy_object_from_object<S1, S2, S3, H, R>(
240        &self,
241        src: S1,
242        dest: S2,
243        headers: H,
244        resources: R,
245    ) -> Result<(), Error>
246    where
247        S1: AsRef<str> + Send,
248        S2: AsRef<str> + Send,
249        S3: AsRef<str> + Send,
250        H: Into<Option<HashMap<S3, S3>>> + Send,
251        R: Into<Option<HashMap<S3, Option<S3>>>> + Send,
252    {
253        let (host, mut headers) = self.build_request(RequestType::Put, dest, headers, resources)?;
254        headers.insert("x-oss-copy-source", src.as_ref().parse()?);
255
256        let resp = self.http_client.put(&host).headers(headers).send().await?;
257
258        if resp.status().is_success() {
259            Ok(())
260        } else {
261            Err(Error::Object(ObjectError::CopyError {
262                msg: format!("can not copy object, status code: {}", resp.status()).into(),
263            }))
264        }
265    }
266
267    async fn delete_object<S>(&self, object_name: S) -> Result<(), Error>
268    where
269        S: AsRef<str> + Send,
270    {
271        let headers = HashMap::<String, String>::new();
272        let (host, headers) =
273            self.build_request(RequestType::Delete, object_name, Some(headers), None)?;
274
275        let resp = self
276            .http_client
277            .delete(&host)
278            .headers(headers)
279            .send()
280            .await?;
281
282        if resp.status().is_success() {
283            Ok(())
284        } else {
285            Err(Error::Object(ObjectError::DeleteError {
286                msg: format!("can not delete object, status code: {}", resp.status()).into(),
287            }))
288        }
289    }
290
291    async fn head_object<S>(&self, object_name: S) -> Result<ObjectMeta, Error>
292    where
293        S: AsRef<str> + Send,
294    {
295        let (host, headers) = self.build_request(
296            RequestType::Head,
297            object_name,
298            None::<HashMap<String, String>>,
299            None,
300        )?;
301
302        let resp = self.http_client.head(&host).headers(headers).send().await?;
303
304        if resp.status().is_success() {
305            Ok(ObjectMeta::from_header_map(resp.headers())?)
306        } else {
307            Err(Error::Object(ObjectError::DeleteError {
308                msg: format!("can not head object, status code: {}", resp.status()).into(),
309            }))
310        }
311    }
312
313    async fn init_multi<S1, S2, H, R>(
314        &self,
315        object_name: S1,
316        headers: H,
317        resources: R,
318    ) -> Result<InitiateMultipartUploadResult, Error>
319    where
320        S1: AsRef<str> + Send,
321        S2: AsRef<str> + Send,
322        H: Into<Option<HashMap<S2, S2>>> + Send,
323        R: Into<Option<HashMap<S2, Option<S2>>>> + Send,
324    {
325        let (host, headers) =
326            self.build_request(RequestType::Post, object_name, headers, resources)?;
327
328        let resp = self.http_client.post(&host).headers(headers).send().await?;
329
330        if resp.status().is_success() {
331            let body = resp.text().await?;
332            let res = quick_xml::de::from_str::<InitiateMultipartUploadResult>(&body)?;
333            Ok(res)
334        } else {
335            Err(Error::Object(ObjectError::PostError {
336                msg: format!(
337                    "init multi failed, status code, status code: {}",
338                    resp.status()
339                )
340                .into(),
341            }))
342        }
343    }
344
345    async fn upload_part<S1, S2, H, R>(
346        &self,
347        buf: &[u8],
348        object_name: S1,
349        headers: H,
350        resources: R,
351    ) -> Result<String, Error>
352    where
353        S1: AsRef<str> + Send,
354        S2: AsRef<str> + Send,
355        H: Into<Option<HashMap<S2, S2>>> + Send,
356        R: Into<Option<HashMap<S2, Option<S2>>>> + Send,
357    {
358        let (host, headers) =
359            self.build_request(RequestType::Put, object_name, headers, resources)?;
360
361        let resp = self
362            .http_client
363            .put(&host)
364            .headers(headers)
365            .body(buf.to_owned())
366            .send()
367            .await?;
368
369        if resp.status().is_success() {
370            let e_tag = resp.headers().get("ETag").unwrap().to_str().unwrap();
371            Ok(e_tag.to_string())
372        } else {
373            Err(Error::Object(ObjectError::PutError {
374                msg: format!(
375                    "can not put object, status code, status code: {}",
376                    resp.status()
377                )
378                .into(),
379            }))
380        }
381    }
382
383    async fn complete_multi<S1, S2, H, R>(
384        &self,
385        body: String,
386        object_name: S1,
387        headers: H,
388        resources: R,
389    ) -> Result<CompleteMultipartUploadResult, Error>
390    where
391        S1: AsRef<str> + Send,
392        S2: AsRef<str> + Send,
393        H: Into<Option<HashMap<S2, S2>>> + Send,
394        R: Into<Option<HashMap<S2, Option<S2>>>> + Send,
395    {
396        let (host, headers) =
397            self.build_request(RequestType::Post, object_name, headers, resources)?;
398
399        let resp = self
400            .http_client
401            .post(&host)
402            .headers(headers)
403            .body(body)
404            .send()
405            .await?;
406
407        if resp.status().is_success() {
408            let body = resp.text().await?;
409            let res = quick_xml::de::from_str::<CompleteMultipartUploadResult>(&body)?;
410            Ok(res)
411        } else {
412            Err(Error::Object(ObjectError::PostError {
413                msg: format!(
414                    "complete multi failed, status code, status code: {}",
415                    resp.status()
416                )
417                .into(),
418            }))
419        }
420    }
421
422    async fn abort_multi<S1, S2, H, R>(
423        &self,
424        object_name: S1,
425        headers: H,
426        resources: R,
427    ) -> Result<(), Error>
428    where
429        S1: AsRef<str> + Send,
430        S2: AsRef<str> + Send,
431        H: Into<Option<HashMap<S2, S2>>> + Send,
432        R: Into<Option<HashMap<S2, Option<S2>>>> + Send,
433    {
434        let (host, headers) =
435            self.build_request(RequestType::Delete, object_name, headers, resources)?;
436
437        let resp = self
438            .http_client
439            .delete(&host)
440            .headers(headers)
441            .send()
442            .await?;
443
444        if resp.status().is_success() {
445            Ok(())
446        } else {
447            Err(Error::Object(ObjectError::DeleteError {
448                msg: format!(
449                    "abort multi failed, status code, status code: {}",
450                    resp.status()
451                )
452                .into(),
453            }))
454        }
455    }
456}