rust_qcos/
objects.rs

1//! object操作相关
2use crate::acl;
3use crate::client;
4pub use crate::request::{
5    CompleteMultipartUpload, ErrNo, InitiateMultipartUploadResult, Part, Request, Response,
6};
7pub use mime;
8pub use quick_xml::de::from_str;
9pub use quick_xml::se::to_string;
10pub use reqwest::Body;
11use std::collections::HashMap;
12use std::fs;
13use std::io::Cursor;
14
15#[async_trait::async_trait]
16pub trait Objects {
17    /// 上传本地小文件
18    async fn put_object(
19        &self,
20        content_type: mime::Mime,
21        key: &str,
22        data: Vec<u8>,
23        acl_header: Option<&acl::AclHeader>,
24    ) -> Response;
25
26    /// 上传本地大文件
27    async fn put_big_object(
28        &self,
29        file_path: &str,
30        key: &str,
31        content_type: mime::Mime,
32        storage_class: &str,
33        acl_header: Option<&acl::AclHeader>,
34        part_size: u64,
35    ) -> Response;
36
37    /// 判断文件是否存在
38    async fn head_object(&self, key: &str) -> Response;
39
40    /// 删除文件
41    async fn delete_object(&self, key: &str) -> Response;
42
43    /// 获取文件二进制流
44    async fn get_object_binary(&self, key: &str) -> Response;
45
46    /// 下载文件到本地
47    async fn get_object(&self, key: &str, file_name: &str) -> Response;
48
49    /// 获取分块上传的upload_id
50    async fn put_object_get_upload_id(
51        &self,
52        key: &str,
53        content_type: &mime::Mime,
54        storage_class: &str,
55        acl_header: Option<&acl::AclHeader>,
56    ) -> Response;
57
58    /// 分块上传
59    async fn put_object_part(
60        &self,
61        key: &str,
62        upload_id: &str,
63        part_number: u64,
64        body: Vec<u8>,
65        content_type: &mime::Mime,
66        acl_header: Option<&acl::AclHeader>,
67    ) -> Response;
68
69    /// 完成分块上传
70    async fn put_object_complete_part(
71        &self,
72        key: &str,
73        etag_map: &HashMap<u64, String>,
74        upload_id: &str,
75    ) -> Response;
76
77    /// Abort Multipart Upload 用来实现舍弃一个分块上传并删除已上传的块。
78    /// 当您调用 Abort Multipart Upload 时,如果有正在使用这个 Upload Parts 上传块的请求,
79    /// 则 Upload Parts 会返回失败。当该 UploadId 不存在时,会返回404 NoSuchUpload。
80    async fn abort_object_part(&self, key: &str, upload_id: &str) -> Response;
81}
82
83#[async_trait::async_trait]
84impl Objects for client::Client {
85    /// 上传本地小文件
86    /// 见[官网文档](https://cloud.tencent.com/document/product/436/7749)
87    /// # Examples
88    /// ```
89    /// use rust_qcos::client::Client;
90    /// use rust_qcos::objects::Objects;
91    /// use mime;
92    /// use rust_qcos::acl::{AclHeader, ObjectAcl};
93    /// async {
94    /// let mut acl_header = AclHeader::new();
95    /// acl_header.insert_object_x_cos_acl(ObjectAcl::AuthenticatedRead);
96    /// let client = Client::new("foo", "bar", "qcloudtest-1256650966", "ap-guangzhou");
97    /// let data = std::fs::read("Cargo.toml").unwrap();
98    /// let res = client.put_object(mime::TEXT_PLAIN_UTF_8, "Cargo.toml", data, None).await;
99    /// assert!(res.error_message.contains("403"));
100    /// };
101    /// ```
102    async fn put_object(
103        &self,
104        content_type: mime::Mime,
105        key: &str,
106        data: Vec<u8>,
107        acl_header: Option<&acl::AclHeader>,
108    ) -> Response {
109        let mut headers = self.gen_common_headers();
110        headers.insert("Content-Type".to_string(), content_type.to_string());
111        headers.insert("Content-Length".to_string(), data.len().to_string());
112        let url_path = self.get_path_from_object_key(key);
113        headers =
114            self.get_headers_with_auth("put", url_path.as_str(), acl_header, Some(headers), None);
115        let resp = Request::put(
116            self.get_full_url_from_path(url_path.as_str()).as_str(),
117            None,
118            Some(&headers),
119            None,
120            None,
121            Some(reqwest::Body::from(data)),
122        )
123        .await;
124        self.make_response(resp)
125    }
126
127    /// 上传本地大文件
128    /// 见[官网文档](https://cloud.tencent.com/document/product/436/7749)
129    /// # Examples
130    /// ```
131    /// use rust_qcos::client::Client;
132    /// use rust_qcos::objects::Objects;
133    /// use mime;
134    /// use rust_qcos::acl::{AclHeader, ObjectAcl};
135    /// async {
136    /// let mut acl_header = AclHeader::new();
137    /// acl_header.insert_object_x_cos_acl(ObjectAcl::AuthenticatedRead);
138    /// let client = Client::new("foo", "bar", "qcloudtest-1256650966", "ap-guangzhou");
139    /// // 分块传输
140    /// let res = client.put_big_object("Cargo.toml","Cargo.toml", mime::TEXT_PLAIN_UTF_8, "ARCHIVE", Some(&acl_header), 1024 * 1024 * 100).await;
141    /// assert!(res.error_message.contains("403"));
142    /// };
143    /// ```
144    async fn put_big_object(
145        &self,
146        file_path: &str,
147        key: &str,
148        content_type: mime::Mime,
149        storage_class: &str,
150        acl_header: Option<&acl::AclHeader>,
151        part_size: u64,
152    ) -> Response {
153        use tokio::io::AsyncReadExt;
154        use tokio::io::AsyncSeekExt;
155        use tokio::io::SeekFrom;
156        assert!(part_size > 0);
157        let mut file = match tokio::fs::File::open(file_path).await {
158            Ok(file) => file,
159            Err(e) => {
160                return Response::new(
161                    ErrNo::IO,
162                    format!("打开文件失败: {}, {}", file_path, e),
163                    Default::default(),
164                )
165            }
166        };
167        // 设置为分块上传或者大于5G会启动分块上传
168        let file_size = match file.metadata().await {
169            Ok(meta) => meta.len(),
170            Err(e) => {
171                return Response::new(
172                    ErrNo::IO,
173                    format!("获取文件大小失败: {}, {}", file_path, e),
174                    Default::default(),
175                )
176            }
177        };
178        let mut part_number = 1;
179        let mut start: u64;
180        let mut etag_map = HashMap::new();
181        let upload_id = self
182            .put_object_get_upload_id(key, &content_type, storage_class, acl_header)
183            .await;
184        if upload_id.error_no != ErrNo::SUCCESS {
185            return upload_id;
186        }
187        let upload_id = String::from_utf8_lossy(&upload_id.result[..]).to_string();
188        loop {
189            start = part_size * (part_number - 1);
190            if start >= file_size {
191                // 调用合并
192                let resp = self
193                    .put_object_complete_part(key, &etag_map, upload_id.as_str())
194                    .await;
195                if resp.error_no != ErrNo::SUCCESS {
196                    // 调用清理
197                    self.abort_object_part(key, upload_id.as_str()).await;
198                }
199                return resp;
200            }
201            let mut size = part_size;
202            // 计算剩余的大小
203            if file_size - start < part_size {
204                size = file_size - start;
205            }
206            // 如果剩余的块小于1M, 那么要全部上传
207            if file_size - size - start <= 1024 * 1024 {
208                size = file_size - start;
209            }
210            if let Err(e) = file.seek(SeekFrom::Start(start)).await {
211                // 调用清理
212                self.abort_object_part(key, upload_id.as_str()).await;
213                return Response::new(
214                    ErrNo::IO,
215                    format!("设置文件指针失败: {}, {}", file_path, e),
216                    Default::default(),
217                );
218            }
219            let mut body: Vec<u8> = vec![0; size as usize];
220            if let Err(e) = file.read_exact(&mut body).await {
221                // 调用清理
222                self.abort_object_part(key, upload_id.as_str()).await;
223                return Response::new(
224                    ErrNo::IO,
225                    format!("读取文件失败: {}, {}", file_path, e),
226                    Default::default(),
227                );
228            }
229            let resp = self
230                .put_object_part(
231                    key,
232                    upload_id.as_str(),
233                    part_number,
234                    body,
235                    &content_type,
236                    acl_header,
237                )
238                .await;
239            if resp.error_no != ErrNo::SUCCESS {
240                // 调用清理
241                self.abort_object_part(key, upload_id.as_str()).await;
242                return resp;
243            }
244            etag_map.insert(part_number, resp.headers["etag"].clone());
245            part_number += 1;
246        }
247    }
248
249    async fn head_object(&self, key: &str) -> Response {
250        let url_path = self.get_path_from_object_key(key);
251        let headers = self.get_headers_with_auth("head", url_path.as_str(), None, None, None);
252        let resp = Request::head(
253            self.get_full_url_from_path(url_path.as_str()).as_str(),
254            None,
255            Some(&headers),
256        )
257        .await;
258        match resp {
259            Ok(e) => e,
260            Err(e) => e,
261        }
262    }
263
264    /// 删除文件
265    /// 见[官网文档](https://cloud.tencent.com/document/product/436/7743)
266    /// # Examples
267    /// ```
268    /// use rust_qcos::client::Client;
269    /// use rust_qcos::objects::Objects;
270    /// async {
271    /// let client = Client::new("foo", "bar", "qcloudtest-1256650966", "ap-guangzhou");
272    /// let res = client.delete_object("Cargo.toml").await;
273    /// assert!(res.error_message.contains("403"))
274    /// };
275    /// ```
276    async fn delete_object(&self, key: &str) -> Response {
277        let url_path = self.get_path_from_object_key(key);
278        let headers = self.get_headers_with_auth("delete", url_path.as_str(), None, None, None);
279        let resp = Request::delete(
280            self.get_full_url_from_path(url_path.as_str()).as_str(),
281            None,
282            Some(&headers),
283            None,
284            None,
285        )
286        .await;
287        match resp {
288            Ok(e) => e,
289            Err(e) => e,
290        }
291    }
292
293    /// 下载文件二进制流
294    /// 见[官网文档](https://cloud.tencent.com/document/product/436/7753)
295    /// # Examples
296    /// ```
297    /// use rust_qcos::client::Client;
298    /// use rust_qcos::objects::Objects;
299    /// async {
300    /// let client = Client::new("foo", "bar", "qcloudtest-1256650966", "ap-guangzhou");
301    /// let res = client.get_object_binary("Cargo.toml").await;
302    /// assert!(res.error_message.contains("403"));
303    /// };
304    /// ```
305    async fn get_object_binary(&self, key: &str) -> Response {
306        let url_path = self.get_path_from_object_key(key);
307        let headers = self.get_headers_with_auth("get", url_path.as_str(), None, None, None);
308        let resp = Request::get(
309            self.get_full_url_from_path(url_path.as_str()).as_str(),
310            None,
311            Some(&headers),
312        )
313        .await;
314        self.make_response(resp)
315    }
316
317    /// 下载文件到本地
318    /// 见[官网文档](https://cloud.tencent.com/document/product/436/7753)
319    /// # Examples
320    /// ```
321    /// use rust_qcos::client::Client;
322    /// use rust_qcos::objects::Objects;
323    /// async {
324    /// let client = Client::new("foo", "bar", "qcloudtest-1256650966", "ap-guangzhou");
325    /// let res = client.get_object("Cargo.toml", "Cargo.toml").await;
326    /// assert!(res.error_message.contains("403"));
327    /// };
328    /// ```
329    async fn get_object(&self, key: &str, file_name: &str) -> Response {
330        let resp = self.get_object_binary(key).await;
331        if resp.error_no == ErrNo::SUCCESS {
332            let output_file_r = fs::File::create(file_name);
333            let mut output_file;
334            match output_file_r {
335                Ok(e) => output_file = e,
336                Err(e) => {
337                    return Response::new(
338                        ErrNo::OTHER,
339                        format!("创建文件失败: {}", e),
340                        "".to_string(),
341                    );
342                }
343            }
344            if let Err(e) = std::io::copy(&mut Cursor::new(resp.result), &mut output_file) {
345                return Response::new(ErrNo::OTHER, format!("下载文件失败: {}", e), "".to_string());
346            }
347            return Response::blank_success();
348        }
349        resp
350    }
351    /// 请求实现初始化分块上传,成功执行此请求后将返回 UploadId,用于后续的 Upload Part 请求
352    /// [官网文档](https://cloud.tencent.com/document/product/436/7746)
353    async fn put_object_get_upload_id(
354        &self,
355        key: &str,
356        content_type: &mime::Mime,
357        storage_class: &str,
358        acl_header: Option<&acl::AclHeader>,
359    ) -> Response {
360        let mut query = HashMap::new();
361        query.insert("uploads".to_string(), "".to_string());
362        let url_path = self.get_path_from_object_key(key);
363        let mut headers = self.gen_common_headers();
364        headers.insert("Content-Type".to_string(), content_type.to_string());
365        headers.insert("x-cos-storage-class".to_string(), storage_class.to_string());
366        let headers = self.get_headers_with_auth(
367            "post",
368            url_path.as_str(),
369            acl_header,
370            Some(headers),
371            Some(&query),
372        );
373        let resp = Request::post(
374            self.get_full_url_from_path(url_path.as_str()).as_str(),
375            Some(&query),
376            Some(&headers),
377            None,
378            None,
379            None as Option<Body>,
380        )
381        .await;
382        match resp {
383            Ok(res) => {
384                if res.error_no != ErrNo::SUCCESS {
385                    return res;
386                }
387                match quick_xml::de::from_slice::<InitiateMultipartUploadResult>(&res.result[..]) {
388                    Ok(res) => Response::new(ErrNo::SUCCESS, "".to_string(), res.upload_id),
389                    Err(e) => Response::new(ErrNo::DECODE, e.to_string(), Default::default()),
390                }
391            }
392            Err(e) => e,
393        }
394    }
395
396    /// 分块上传文件
397    /// [官网文档](https://cloud.tencent.com/document/product/436/7750)
398    async fn put_object_part(
399        &self,
400        key: &str,
401        upload_id: &str,
402        part_number: u64,
403        body: Vec<u8>,
404        content_type: &mime::Mime,
405        acl_header: Option<&acl::AclHeader>,
406    ) -> Response {
407        let mut headers = self.gen_common_headers();
408        headers.insert("Content-Type".to_string(), content_type.to_string());
409        headers.insert("Content-Length".to_string(), body.len().to_string());
410        let url_path = self.get_path_from_object_key(key);
411        let mut query = HashMap::new();
412        query.insert("partNumber".to_string(), part_number.to_string());
413        query.insert("uploadId".to_string(), upload_id.to_string());
414        headers = self.get_headers_with_auth(
415            "put",
416            url_path.as_str(),
417            acl_header,
418            Some(headers),
419            Some(&query),
420        );
421        let resp = Request::put(
422            self.get_full_url_from_path(url_path.as_str()).as_str(),
423            Some(&query),
424            Some(&headers),
425            None,
426            None,
427            Some(body),
428        )
429        .await;
430        self.make_response(resp)
431    }
432
433    /// 完成分块上传
434    /// [官网文档](https://cloud.tencent.com/document/product/436/7742)
435    async fn put_object_complete_part(
436        &self,
437        key: &str,
438        etag_map: &HashMap<u64, String>,
439        upload_id: &str,
440    ) -> Response {
441        let url_path = self.get_path_from_object_key(key);
442        let mut query = HashMap::new();
443        query.insert("uploadId".to_string(), upload_id.to_string());
444        let mut headers = self.gen_common_headers();
445        headers.insert("Content-Type".to_string(), "application/xml".to_string());
446        let headers = self.get_headers_with_auth(
447            "post",
448            url_path.as_str(),
449            None,
450            Some(headers),
451            Some(&query),
452        );
453        let mut parts = Vec::new();
454        // 按part_number排序
455        let mut keys = Vec::new();
456        for k in etag_map.keys() {
457            keys.push(k);
458        }
459        keys.sort();
460        for k in keys {
461            parts.push(Part {
462                part_number: *k,
463                etag: etag_map[k].clone(),
464            })
465        }
466        let complete = CompleteMultipartUpload { part: parts };
467        let serialized_str = match to_string(&complete) {
468            Ok(s) => s,
469            Err(e) => return Response::new(ErrNo::ENCODE, e.to_string(), Default::default()),
470        };
471        let resp = Request::post(
472            self.get_full_url_from_path(url_path.as_str()).as_str(),
473            Some(&query),
474            Some(&headers),
475            None,
476            None,
477            Some(serialized_str),
478        )
479        .await;
480        self.make_response(resp)
481    }
482
483    /// 终止分块上传,清理文件碎片
484    /// [官网文档](https://cloud.tencent.com/document/product/436/7740)
485    async fn abort_object_part(&self, key: &str, upload_id: &str) -> Response {
486        let url_path = self.get_path_from_object_key(key);
487        let mut query = HashMap::new();
488        query.insert("uploadId".to_string(), upload_id.to_string());
489        let headers =
490            self.get_headers_with_auth("delete", url_path.as_str(), None, None, Some(&query));
491        let resp = Request::delete(
492            self.get_full_url_from_path(url_path.as_str()).as_str(),
493            Some(&query),
494            Some(&headers),
495            None,
496            None,
497        )
498        .await;
499        self.make_response(resp)
500    }
501}