1use std::collections::HashMap;
2
3use crate::{
4 multi_part::{CompleteMultipartUploadResult, InitiateMultipartUploadResult},
5 oss::{ObjectMeta, RequestType},
6 prelude::{ListObjects, OSS},
7};
8
9use super::errors::{Error, ObjectError};
10
11use async_trait::async_trait;
12use bytes::Bytes;
13
14#[async_trait]
15pub trait AsyncObjectAPI {
16 async fn list_object<S, H, R>(&self, headers: H, resources: R) -> Result<ListObjects, Error>
17 where
18 S: AsRef<str>,
19 H: Into<Option<HashMap<S, S>>> + Send,
20 R: Into<Option<HashMap<S, Option<S>>>> + Send;
21
22 async fn get_object<S1, S2, H, R>(
23 &self,
24 object_name: S1,
25 headers: H,
26 resources: R,
27 ) -> Result<Bytes, Error>
28 where
29 S1: AsRef<str> + Send,
30 S2: AsRef<str> + Send,
31 H: Into<Option<HashMap<S2, S2>>> + Send,
32 R: Into<Option<HashMap<S2, Option<S2>>>> + Send;
33
34 async fn put_object<S1, S2, H, R>(
35 &self,
36 buf: &[u8],
37 object_name: S1,
38 headers: H,
39 resources: R,
40 ) -> Result<(), Error>
41 where
42 S1: AsRef<str> + Send,
43 S2: AsRef<str> + Send,
44 H: Into<Option<HashMap<S2, S2>>> + Send,
45 R: Into<Option<HashMap<S2, Option<S2>>>> + Send;
46
47 async fn copy_object_from_object<S1, S2, S3, H, R>(
48 &self,
49 src: S1,
50 dest: S2,
51 headers: H,
52 resources: R,
53 ) -> Result<(), Error>
54 where
55 S1: AsRef<str> + Send,
56 S2: AsRef<str> + Send,
57 S3: AsRef<str> + Send,
58 H: Into<Option<HashMap<S3, S3>>> + Send,
59 R: Into<Option<HashMap<S3, Option<S3>>>> + Send;
60
61 async fn delete_object<S>(&self, object_name: S) -> Result<(), Error>
62 where
63 S: AsRef<str> + Send;
64
65 async fn head_object<S>(&self, object_name: S) -> Result<ObjectMeta, Error>
66 where
67 S: AsRef<str> + Send;
68
69 async fn init_multi<S1, S2, H, R>(
71 &self,
72 object_name: S1,
73 headers: H,
74 resources: R,
75 ) -> Result<InitiateMultipartUploadResult, Error>
76 where
77 S1: AsRef<str> + Send,
78 S2: AsRef<str> + Send,
79 H: Into<Option<HashMap<S2, S2>>> + Send,
80 R: Into<Option<HashMap<S2, Option<S2>>>> + Send;
81
82 async fn upload_part<S1, S2, H, R>(
84 &self,
85 buf: &[u8],
86 object_name: S1,
87 headers: H,
88 resources: R,
89 ) -> Result<String, Error>
90 where
91 S1: AsRef<str> + Send,
92 S2: AsRef<str> + Send,
93 H: Into<Option<HashMap<S2, S2>>> + Send,
94 R: Into<Option<HashMap<S2, Option<S2>>>> + Send;
95
96 async fn complete_multi<S1, S2, H, R>(
132 &self,
133 body: String,
134 object_name: S1,
135 headers: H,
136 resources: R,
137 ) -> Result<CompleteMultipartUploadResult, Error>
138 where
139 S1: AsRef<str> + Send,
140 S2: AsRef<str> + Send,
141 H: Into<Option<HashMap<S2, S2>>> + Send,
142 R: Into<Option<HashMap<S2, Option<S2>>>> + Send;
143
144 async fn abort_multi<S1, S2, H, R>(
146 &self,
147 object_name: S1,
148 headers: H,
149 resources: R,
150 ) -> Result<(), Error>
151 where
152 S1: AsRef<str> + Send,
153 S2: AsRef<str> + Send,
154 H: Into<Option<HashMap<S2, S2>>> + Send,
155 R: Into<Option<HashMap<S2, Option<S2>>>> + Send;
156}
157
158#[async_trait]
159impl<'a> AsyncObjectAPI for OSS<'a> {
160 async fn list_object<S, H, R>(&self, headers: H, resources: R) -> Result<ListObjects, Error>
161 where
162 S: AsRef<str>,
163 H: Into<Option<HashMap<S, S>>> + Send,
164 R: Into<Option<HashMap<S, Option<S>>>> + Send,
165 {
166 let (host, headers) =
167 self.build_request(RequestType::Get, String::new(), headers, resources)?;
168
169 let resp = self.http_client.get(host).headers(headers).send().await?;
170 let body = resp.text().await?;
171 let list_objects = quick_xml::de::from_str::<ListObjects>(&body)?;
172
173 Ok(list_objects)
174 }
175
176 async fn get_object<S1, S2, H, R>(
177 &self,
178 object_name: S1,
179 headers: H,
180 resources: R,
181 ) -> Result<Bytes, Error>
182 where
183 S1: AsRef<str> + Send,
184 S2: AsRef<str> + Send,
185 H: Into<Option<HashMap<S2, S2>>> + Send,
186 R: Into<Option<HashMap<S2, Option<S2>>>> + Send,
187 {
188 let (host, headers) =
189 self.build_request(RequestType::Get, object_name, headers, resources)?;
190
191 let resp = self.http_client.get(&host).headers(headers).send().await?;
192
193 if resp.status().is_success() {
194 Ok(resp.bytes().await?)
195 } else {
196 Err(Error::Object(ObjectError::GetError {
197 msg: format!("can not get object, status code: {}", resp.status()).into(),
198 }))
199 }
200 }
201
202 async fn put_object<S1, S2, H, R>(
203 &self,
204 buf: &[u8],
205 object_name: S1,
206 headers: H,
207 resources: R,
208 ) -> Result<(), Error>
209 where
210 S1: AsRef<str> + Send,
211 S2: AsRef<str> + Send,
212 H: Into<Option<HashMap<S2, S2>>> + Send,
213 R: Into<Option<HashMap<S2, Option<S2>>>> + Send,
214 {
215 let (host, headers) =
216 self.build_request(RequestType::Put, object_name, headers, resources)?;
217
218 let resp = self
219 .http_client
220 .put(&host)
221 .headers(headers)
222 .body(buf.to_owned())
223 .send()
224 .await?;
225
226 if resp.status().is_success() {
227 Ok(())
228 } else {
229 Err(Error::Object(ObjectError::DeleteError {
230 msg: format!(
231 "can not put object, status code, status code: {}",
232 resp.status()
233 )
234 .into(),
235 }))
236 }
237 }
238
239 async fn copy_object_from_object<S1, S2, S3, H, R>(
240 &self,
241 src: S1,
242 dest: S2,
243 headers: H,
244 resources: R,
245 ) -> Result<(), Error>
246 where
247 S1: AsRef<str> + Send,
248 S2: AsRef<str> + Send,
249 S3: AsRef<str> + Send,
250 H: Into<Option<HashMap<S3, S3>>> + Send,
251 R: Into<Option<HashMap<S3, Option<S3>>>> + Send,
252 {
253 let (host, mut headers) = self.build_request(RequestType::Put, dest, headers, resources)?;
254 headers.insert("x-oss-copy-source", src.as_ref().parse()?);
255
256 let resp = self.http_client.put(&host).headers(headers).send().await?;
257
258 if resp.status().is_success() {
259 Ok(())
260 } else {
261 Err(Error::Object(ObjectError::CopyError {
262 msg: format!("can not copy object, status code: {}", resp.status()).into(),
263 }))
264 }
265 }
266
267 async fn delete_object<S>(&self, object_name: S) -> Result<(), Error>
268 where
269 S: AsRef<str> + Send,
270 {
271 let headers = HashMap::<String, String>::new();
272 let (host, headers) =
273 self.build_request(RequestType::Delete, object_name, Some(headers), None)?;
274
275 let resp = self
276 .http_client
277 .delete(&host)
278 .headers(headers)
279 .send()
280 .await?;
281
282 if resp.status().is_success() {
283 Ok(())
284 } else {
285 Err(Error::Object(ObjectError::DeleteError {
286 msg: format!("can not delete object, status code: {}", resp.status()).into(),
287 }))
288 }
289 }
290
291 async fn head_object<S>(&self, object_name: S) -> Result<ObjectMeta, Error>
292 where
293 S: AsRef<str> + Send,
294 {
295 let (host, headers) = self.build_request(
296 RequestType::Head,
297 object_name,
298 None::<HashMap<String, String>>,
299 None,
300 )?;
301
302 let resp = self.http_client.head(&host).headers(headers).send().await?;
303
304 if resp.status().is_success() {
305 Ok(ObjectMeta::from_header_map(resp.headers())?)
306 } else {
307 Err(Error::Object(ObjectError::DeleteError {
308 msg: format!("can not head object, status code: {}", resp.status()).into(),
309 }))
310 }
311 }
312
313 async fn init_multi<S1, S2, H, R>(
314 &self,
315 object_name: S1,
316 headers: H,
317 resources: R,
318 ) -> Result<InitiateMultipartUploadResult, Error>
319 where
320 S1: AsRef<str> + Send,
321 S2: AsRef<str> + Send,
322 H: Into<Option<HashMap<S2, S2>>> + Send,
323 R: Into<Option<HashMap<S2, Option<S2>>>> + Send,
324 {
325 let (host, headers) =
326 self.build_request(RequestType::Post, object_name, headers, resources)?;
327
328 let resp = self.http_client.post(&host).headers(headers).send().await?;
329
330 if resp.status().is_success() {
331 let body = resp.text().await?;
332 let res = quick_xml::de::from_str::<InitiateMultipartUploadResult>(&body)?;
333 Ok(res)
334 } else {
335 Err(Error::Object(ObjectError::PostError {
336 msg: format!(
337 "init multi failed, status code, status code: {}",
338 resp.status()
339 )
340 .into(),
341 }))
342 }
343 }
344
345 async fn upload_part<S1, S2, H, R>(
346 &self,
347 buf: &[u8],
348 object_name: S1,
349 headers: H,
350 resources: R,
351 ) -> Result<String, Error>
352 where
353 S1: AsRef<str> + Send,
354 S2: AsRef<str> + Send,
355 H: Into<Option<HashMap<S2, S2>>> + Send,
356 R: Into<Option<HashMap<S2, Option<S2>>>> + Send,
357 {
358 let (host, headers) =
359 self.build_request(RequestType::Put, object_name, headers, resources)?;
360
361 let resp = self
362 .http_client
363 .put(&host)
364 .headers(headers)
365 .body(buf.to_owned())
366 .send()
367 .await?;
368
369 if resp.status().is_success() {
370 let e_tag = resp.headers().get("ETag").unwrap().to_str().unwrap();
371 Ok(e_tag.to_string())
372 } else {
373 Err(Error::Object(ObjectError::PutError {
374 msg: format!(
375 "can not put object, status code, status code: {}",
376 resp.status()
377 )
378 .into(),
379 }))
380 }
381 }
382
383 async fn complete_multi<S1, S2, H, R>(
384 &self,
385 body: String,
386 object_name: S1,
387 headers: H,
388 resources: R,
389 ) -> Result<CompleteMultipartUploadResult, Error>
390 where
391 S1: AsRef<str> + Send,
392 S2: AsRef<str> + Send,
393 H: Into<Option<HashMap<S2, S2>>> + Send,
394 R: Into<Option<HashMap<S2, Option<S2>>>> + Send,
395 {
396 let (host, headers) =
397 self.build_request(RequestType::Post, object_name, headers, resources)?;
398
399 let resp = self
400 .http_client
401 .post(&host)
402 .headers(headers)
403 .body(body)
404 .send()
405 .await?;
406
407 if resp.status().is_success() {
408 let body = resp.text().await?;
409 let res = quick_xml::de::from_str::<CompleteMultipartUploadResult>(&body)?;
410 Ok(res)
411 } else {
412 Err(Error::Object(ObjectError::PostError {
413 msg: format!(
414 "complete multi failed, status code, status code: {}",
415 resp.status()
416 )
417 .into(),
418 }))
419 }
420 }
421
422 async fn abort_multi<S1, S2, H, R>(
423 &self,
424 object_name: S1,
425 headers: H,
426 resources: R,
427 ) -> Result<(), Error>
428 where
429 S1: AsRef<str> + Send,
430 S2: AsRef<str> + Send,
431 H: Into<Option<HashMap<S2, S2>>> + Send,
432 R: Into<Option<HashMap<S2, Option<S2>>>> + Send,
433 {
434 let (host, headers) =
435 self.build_request(RequestType::Delete, object_name, headers, resources)?;
436
437 let resp = self
438 .http_client
439 .delete(&host)
440 .headers(headers)
441 .send()
442 .await?;
443
444 if resp.status().is_success() {
445 Ok(())
446 } else {
447 Err(Error::Object(ObjectError::DeleteError {
448 msg: format!(
449 "abort multi failed, status code, status code: {}",
450 resp.status()
451 )
452 .into(),
453 }))
454 }
455 }
456}