tencent_qcloud_cos_rs/
objects.rs

1//! object操作相关
2use crate::acl;
3use crate::client;
4pub use crate::request::{
5    CompleteMultipartUpload, ErrNo, InitiateMultipartUploadResult, Part, Request, Response,
6};
7pub use mime;
8pub use quick_xml::de::from_str;
9pub use quick_xml::se::to_string;
10pub use reqwest::Body;
11use std::collections::HashMap;
12use std::fs;
13use std::io::Cursor;
14
15#[async_trait::async_trait]
16pub trait Objects {
17    /// 上传本地小文件
18    async fn put_object(
19        &self,
20        file_path: &str,
21        key: &str,
22        content_type: mime::Mime,
23        acl_header: Option<&acl::AclHeader>,
24    ) -> Response;
25
26    /// 上传本地大文件
27    async fn put_big_object(
28        &self,
29        file_path: &str,
30        key: &str,
31        content_type: mime::Mime,
32        storage_class: &str,
33        acl_header: Option<&acl::AclHeader>,
34        part_size: u64,
35    ) -> Response;
36
37    /// 上传二进制流
38    async fn put_object_binary<T: Into<Body> + Send>(
39        &self,
40        file: T,
41        key: &str,
42        content_type: mime::Mime,
43        acl_header: Option<&acl::AclHeader>,
44    ) -> Response;
45
46    /// 删除文件
47    async fn delete_object(&self, key: &str) -> Response;
48
49    /// 获取文件二进制流
50    async fn get_object_binary(&self, key: &str) -> Response;
51
52    /// 下载文件到本地
53    async fn get_object(&self, key: &str, file_name: &str) -> Response;
54
55    /// 获取分块上传的upload_id
56    async fn put_object_get_upload_id(
57        &self,
58        key: &str,
59        content_type: &mime::Mime,
60        storage_class: &str,
61        acl_header: Option<&acl::AclHeader>,
62    ) -> Response;
63
64    /// 分块上传
65    async fn put_object_part(
66        &self,
67        key: &str,
68        upload_id: &str,
69        part_number: u64,
70        body: Vec<u8>,
71        content_type: &mime::Mime,
72        acl_header: Option<&acl::AclHeader>,
73    ) -> Response;
74
75    /// 完成分块上传
76    async fn put_object_complete_part(
77        &self,
78        key: &str,
79        etag_map: &HashMap<u64, String>,
80        upload_id: &str,
81    ) -> Response;
82
83    /// Abort Multipart Upload 用来实现舍弃一个分块上传并删除已上传的块。
84    /// 当您调用 Abort Multipart Upload 时,如果有正在使用这个 Upload Parts 上传块的请求,
85    /// 则 Upload Parts 会返回失败。当该 UploadId 不存在时,会返回404 NoSuchUpload。
86    async fn abort_object_part(&self, key: &str, upload_id: &str) -> Response;
87}
88
89#[async_trait::async_trait]
90impl Objects for client::Client {
91    /// 上传本地小文件
92    /// 见[官网文档](https://cloud.tencent.com/document/product/436/7749)
93    /// # Examples
94    /// ```
95    /// use tencent_qcloud_cos_rs::client::Client;
96    /// use tencent_qcloud_cos_rs::objects::Objects;
97    /// use mime;
98    /// use tencent_qcloud_cos_rs::acl::{AclHeader, ObjectAcl};
99    /// async {
100    /// let mut acl_header = AclHeader::new();
101    /// acl_header.insert_object_x_cos_acl(ObjectAcl::AuthenticatedRead);
102    /// let client = Client::new("foo", "bar", None, "qcloudtest-1256650966", "ap-guangzhou");
103    /// let res = client.put_object("Cargo.toml", "Cargo.toml", mime::TEXT_PLAIN_UTF_8, Some(&acl_header)).await;
104    /// assert!(res.error_message.contains("403"));
105    /// };
106    /// ```
107    async fn put_object(
108        &self,
109        file_path: &str,
110        key: &str,
111        content_type: mime::Mime,
112        acl_header: Option<&acl::AclHeader>,
113    ) -> Response {
114        let file = match tokio::fs::File::open(file_path).await {
115            Ok(file) => file,
116            Err(e) => {
117                return Response::new(
118                    ErrNo::IO,
119                    format!("打开文件失败: {}, {}", file_path, e),
120                    Default::default(),
121                )
122            }
123        };
124        // 设置为分块上传或者大于5G会启动分块上传
125        let file_size = match file.metadata().await {
126            Ok(meta) => meta.len() as usize,
127            Err(e) => {
128                return Response::new(
129                    ErrNo::IO,
130                    format!("获取文件大小失败: {}, {}", file_path, e),
131                    Default::default(),
132                )
133            }
134        };
135        let mut headers = self.gen_common_headers();
136        headers.insert("Content-Type".to_string(), content_type.to_string());
137        headers.insert("Content-Length".to_string(), file_size.to_string());
138        let url_path = self.get_path_from_object_key(key);
139        headers =
140            self.get_headers_with_auth("put", url_path.as_str(), acl_header, Some(headers), None);
141        let resp = Request::put(
142            self.get_full_url_from_path(url_path.as_str()).as_str(),
143            None,
144            Some(&headers),
145            None,
146            None,
147            Some(file),
148        )
149        .await;
150        self.make_response(resp)
151    }
152
153    /// 上传本地大文件
154    /// 见[官网文档](https://cloud.tencent.com/document/product/436/7749)
155    /// # Examples
156    /// ```
157    /// use tencent_qcloud_cos_rs::client::Client;
158    /// use tencent_qcloud_cos_rs::objects::Objects;
159    /// use mime;
160    /// use tencent_qcloud_cos_rs::acl::{AclHeader, ObjectAcl};
161    /// async {
162    /// let mut acl_header = AclHeader::new();
163    /// acl_header.insert_object_x_cos_acl(ObjectAcl::AuthenticatedRead);
164    /// let client = Client::new("foo", "bar", None, "qcloudtest-1256650966", "ap-guangzhou");
165    /// // 分块传输
166    /// let res = client.put_big_object("Cargo.toml","Cargo.toml", mime::TEXT_PLAIN_UTF_8, "ARCHIVE", Some(&acl_header), 1024 * 1024 * 100).await;
167    /// assert!(res.error_message.contains("403"));
168    /// };
169    /// ```
170    async fn put_big_object(
171        &self,
172        file_path: &str,
173        key: &str,
174        content_type: mime::Mime,
175        storage_class: &str,
176        acl_header: Option<&acl::AclHeader>,
177        part_size: u64,
178    ) -> Response {
179        use tokio::io::AsyncReadExt;
180        use tokio::io::AsyncSeekExt;
181        use tokio::io::SeekFrom;
182        assert!(part_size > 0);
183        let mut file = match tokio::fs::File::open(file_path).await {
184            Ok(file) => file,
185            Err(e) => {
186                return Response::new(
187                    ErrNo::IO,
188                    format!("打开文件失败: {}, {}", file_path, e),
189                    Default::default(),
190                )
191            }
192        };
193        // 设置为分块上传或者大于5G会启动分块上传
194        let file_size = match file.metadata().await {
195            Ok(meta) => meta.len(),
196            Err(e) => {
197                return Response::new(
198                    ErrNo::IO,
199                    format!("获取文件大小失败: {}, {}", file_path, e),
200                    Default::default(),
201                )
202            }
203        };
204        let mut part_number = 1;
205        let mut start;
206        let mut etag_map = HashMap::new();
207        let upload_id = self
208            .put_object_get_upload_id(key, &content_type, storage_class, acl_header)
209            .await;
210        if upload_id.error_no != ErrNo::SUCCESS {
211            return upload_id;
212        }
213        let upload_id = String::from_utf8_lossy(&upload_id.result[..]).to_string();
214        loop {
215            start = part_size * (part_number - 1);
216            if start >= file_size {
217                // 调用合并
218                let resp = self
219                    .put_object_complete_part(key, &etag_map, upload_id.as_str())
220                    .await;
221                if resp.error_no != ErrNo::SUCCESS {
222                    // 调用清理
223                    self.abort_object_part(key, upload_id.as_str()).await;
224                }
225                return resp;
226            }
227            let mut size = part_size;
228            // 计算剩余的大小
229            if file_size - start < part_size {
230                size = file_size - start;
231            }
232            // 如果剩余的块小于1M, 那么要全部上传
233            if file_size - size - start <= 1024 * 1024 {
234                size = file_size - start;
235            }
236            if let Err(e) = file.seek(SeekFrom::Start(start as u64)).await {
237                // 调用清理
238                self.abort_object_part(key, upload_id.as_str()).await;
239                return Response::new(
240                    ErrNo::IO,
241                    format!("设置文件指针失败: {}, {}", file_path, e),
242                    Default::default(),
243                );
244            }
245            let mut body: Vec<u8> = vec![0; size as usize];
246            if let Err(e) = file.read_exact(&mut body).await {
247                // 调用清理
248                self.abort_object_part(key, upload_id.as_str()).await;
249                return Response::new(
250                    ErrNo::IO,
251                    format!("读取文件失败: {}, {}", file_path, e),
252                    Default::default(),
253                );
254            }
255            let resp = self
256                .put_object_part(
257                    key,
258                    upload_id.as_str(),
259                    part_number,
260                    body,
261                    &content_type,
262                    acl_header,
263                )
264                .await;
265            if resp.error_no != ErrNo::SUCCESS {
266                // 调用清理
267                self.abort_object_part(key, upload_id.as_str()).await;
268                return resp;
269            }
270            etag_map.insert(part_number, resp.headers["etag"].clone());
271            part_number += 1;
272        }
273    }
274
275    /// 上传二进制流
276    /// 见[官网文档](https://cloud.tencent.com/document/product/436/7749)
277    /// # Examples
278    /// ```
279    /// use tencent_qcloud_cos_rs::client::Client;
280    /// use tencent_qcloud_cos_rs::objects::Objects;
281    /// use mime;
282    /// use tencent_qcloud_cos_rs::acl::{AclHeader, ObjectAcl};
283    /// async {
284    /// let mut acl_header = AclHeader::new();
285    /// acl_header.insert_object_x_cos_acl(ObjectAcl::AuthenticatedRead);
286    /// let client = Client::new("foo", "bar", None, "qcloudtest-1256650966", "ap-guangzhou");
287    /// let buffer = std::fs::read("Cargo.toml").unwrap();
288    /// let res = client.put_object_binary(buffer, "Cargo.toml", mime::TEXT_PLAIN_UTF_8, Some(&acl_header)).await;
289    /// assert!(res.error_message.contains("403"));
290    /// };
291    /// ```
292    async fn put_object_binary<T: Into<Body> + Send>(
293        &self,
294        file: T,
295        key: &str,
296        content_type: mime::Mime,
297        acl_header: Option<&acl::AclHeader>,
298    ) -> Response {
299        let body: Body = file.into();
300        let bytes = body.as_bytes();
301        if bytes.is_none() {
302            return Response::new(ErrNo::IO, "不是内存对象".to_owned(), Default::default());
303        }
304        let file_size = bytes.unwrap().len();
305        let mut headers = self.gen_common_headers();
306        headers.insert("Content-Type".to_string(), content_type.to_string());
307        headers.insert("Content-Length".to_string(), file_size.to_string());
308        let url_path = self.get_path_from_object_key(key);
309        headers =
310            self.get_headers_with_auth("put", url_path.as_str(), acl_header, Some(headers), None);
311        let resp = Request::put(
312            self.get_full_url_from_path(url_path.as_str()).as_str(),
313            None,
314            Some(&headers),
315            None,
316            None,
317            Some(body),
318        )
319        .await;
320        self.make_response(resp)
321    }
322    /// 删除文件
323    /// 见[官网文档](https://cloud.tencent.com/document/product/436/7743)
324    /// # Examples
325    /// ```
326    /// use tencent_qcloud_cos_rs::client::Client;
327    /// use tencent_qcloud_cos_rs::objects::Objects;
328    /// async {
329    /// let client = Client::new("foo", "bar", None, "qcloudtest-1256650966", "ap-guangzhou");
330    /// let res = client.delete_object("Cargo.toml").await;
331    /// assert!(res.error_message.contains("403"))
332    /// };
333    /// ```
334    async fn delete_object(&self, key: &str) -> Response {
335        let url_path = self.get_path_from_object_key(key);
336        let headers = self.get_headers_with_auth("delete", url_path.as_str(), None, None, None);
337        let resp = Request::delete(
338            self.get_full_url_from_path(url_path.as_str()).as_str(),
339            None,
340            Some(&headers),
341            None,
342            None,
343        )
344        .await;
345        match resp {
346            Ok(e) => e,
347            Err(e) => e,
348        }
349    }
350
351    /// 下载文件二进制流
352    /// 见[官网文档](https://cloud.tencent.com/document/product/436/7753)
353    /// # Examples
354    /// ```
355    /// use tencent_qcloud_cos_rs::client::Client;
356    /// use tencent_qcloud_cos_rs::objects::Objects;
357    /// async {
358    /// let client = Client::new("foo", "bar", None, "qcloudtest-1256650966", "ap-guangzhou");
359    /// let res = client.get_object_binary("Cargo.toml").await;
360    /// assert!(res.error_message.contains("403"));
361    /// };
362    /// ```
363    async fn get_object_binary(&self, key: &str) -> Response {
364        let url_path = self.get_path_from_object_key(key);
365        let headers = self.get_headers_with_auth("get", url_path.as_str(), None, None, None);
366        let resp = Request::get(
367            self.get_full_url_from_path(url_path.as_str()).as_str(),
368            None,
369            Some(&headers),
370        )
371        .await;
372        self.make_response(resp)
373    }
374
375    /// 下载文件到本地
376    /// 见[官网文档](https://cloud.tencent.com/document/product/436/7753)
377    /// # Examples
378    /// ```
379    /// use tencent_qcloud_cos_rs::client::Client;
380    /// use tencent_qcloud_cos_rs::objects::Objects;
381    /// async {
382    /// let client = Client::new("foo", "bar", None, "qcloudtest-1256650966", "ap-guangzhou");
383    /// let res = client.get_object("Cargo.toml", "Cargo.toml").await;
384    /// assert!(res.error_message.contains("403"));
385    /// };
386    /// ```
387    async fn get_object(&self, key: &str, file_name: &str) -> Response {
388        let resp = self.get_object_binary(key).await;
389        if resp.error_no == ErrNo::SUCCESS {
390            let output_file_r = fs::File::create(file_name);
391            let mut output_file;
392            match output_file_r {
393                Ok(e) => output_file = e,
394                Err(e) => {
395                    return Response::new(
396                        ErrNo::OTHER,
397                        format!("创建文件失败: {}", e),
398                        "".to_string(),
399                    );
400                }
401            }
402            if let Err(e) = std::io::copy(&mut Cursor::new(resp.result), &mut output_file) {
403                return Response::new(ErrNo::OTHER, format!("下载文件失败: {}", e), "".to_string());
404            }
405            return Response::blank_success();
406        }
407        resp
408    }
409    /// 请求实现初始化分块上传,成功执行此请求后将返回 UploadId,用于后续的 Upload Part 请求
410    /// [官网文档](https://cloud.tencent.com/document/product/436/7746)
411    async fn put_object_get_upload_id(
412        &self,
413        key: &str,
414        content_type: &mime::Mime,
415        storage_class: &str,
416        acl_header: Option<&acl::AclHeader>,
417    ) -> Response {
418        let mut query = HashMap::new();
419        query.insert("uploads".to_string(), "".to_string());
420        let url_path = self.get_path_from_object_key(key);
421        let mut headers = self.gen_common_headers();
422        headers.insert("Content-Type".to_string(), content_type.to_string());
423        headers.insert("x-cos-storage-class".to_string(), storage_class.to_string());
424        let headers = self.get_headers_with_auth(
425            "post",
426            url_path.as_str(),
427            acl_header,
428            Some(headers),
429            Some(&query),
430        );
431        let resp = Request::post(
432            self.get_full_url_from_path(url_path.as_str()).as_str(),
433            Some(&query),
434            Some(&headers),
435            None,
436            None,
437            None as Option<Body>,
438        )
439        .await;
440        match resp {
441            Ok(res) => {
442                if res.error_no != ErrNo::SUCCESS {
443                    return res;
444                }
445                match quick_xml::de::from_slice::<InitiateMultipartUploadResult>(&res.result[..]) {
446                    Ok(res) => Response::new(ErrNo::SUCCESS, "".to_string(), res.upload_id),
447                    Err(e) => Response::new(ErrNo::DECODE, e.to_string(), Default::default()),
448                }
449            }
450            Err(e) => e,
451        }
452    }
453
454    /// 分块上传文件
455    /// [官网文档](https://cloud.tencent.com/document/product/436/7750)
456    async fn put_object_part(
457        &self,
458        key: &str,
459        upload_id: &str,
460        part_number: u64,
461        body: Vec<u8>,
462        content_type: &mime::Mime,
463        acl_header: Option<&acl::AclHeader>,
464    ) -> Response {
465        let mut headers = self.gen_common_headers();
466        headers.insert("Content-Type".to_string(), content_type.to_string());
467        headers.insert("Content-Length".to_string(), body.len().to_string());
468        let url_path = self.get_path_from_object_key(key);
469        let mut query = HashMap::new();
470        query.insert("partNumber".to_string(), part_number.to_string());
471        query.insert("uploadId".to_string(), upload_id.to_string());
472        headers = self.get_headers_with_auth(
473            "put",
474            url_path.as_str(),
475            acl_header,
476            Some(headers),
477            Some(&query),
478        );
479        let resp = Request::put(
480            self.get_full_url_from_path(url_path.as_str()).as_str(),
481            Some(&query),
482            Some(&headers),
483            None,
484            None,
485            Some(body),
486        )
487        .await;
488        self.make_response(resp)
489    }
490
491    /// 完成分块上传
492    /// [官网文档](https://cloud.tencent.com/document/product/436/7742)
493    async fn put_object_complete_part(
494        &self,
495        key: &str,
496        etag_map: &HashMap<u64, String>,
497        upload_id: &str,
498    ) -> Response {
499        let url_path = self.get_path_from_object_key(key);
500        let mut query = HashMap::new();
501        query.insert("uploadId".to_string(), upload_id.to_string());
502        let mut headers = self.gen_common_headers();
503        headers.insert("Content-Type".to_string(), "application/xml".to_string());
504        let headers = self.get_headers_with_auth(
505            "post",
506            url_path.as_str(),
507            None,
508            Some(headers),
509            Some(&query),
510        );
511        let mut parts = Vec::new();
512        // 按part_number排序
513        let mut keys = Vec::new();
514        for k in etag_map.keys() {
515            keys.push(k);
516        }
517        keys.sort();
518        for k in keys {
519            parts.push(Part {
520                part_number: k.clone(),
521                etag: etag_map[&k].clone(),
522            })
523        }
524        let complete = CompleteMultipartUpload { part: parts };
525        let serialized_str = match to_string(&complete) {
526            Ok(s) => s,
527            Err(e) => return Response::new(ErrNo::ENCODE, e.to_string(), Default::default()),
528        };
529        let resp = Request::post(
530            self.get_full_url_from_path(url_path.as_str()).as_str(),
531            Some(&query),
532            Some(&headers),
533            None,
534            None,
535            Some(serialized_str),
536        )
537        .await;
538        self.make_response(resp)
539    }
540
541    /// 终止分块上传,清理文件碎片
542    /// [官网文档](https://cloud.tencent.com/document/product/436/7740)
543    async fn abort_object_part(&self, key: &str, upload_id: &str) -> Response {
544        let url_path = self.get_path_from_object_key(key);
545        let mut query = HashMap::new();
546        query.insert("uploadId".to_string(), upload_id.to_string());
547        let headers =
548            self.get_headers_with_auth("delete", url_path.as_str(), None, None, Some(&query));
549        let resp = Request::delete(
550            self.get_full_url_from_path(url_path.as_str()).as_str(),
551            Some(&query),
552            Some(&headers),
553            None,
554            None,
555        )
556        .await;
557        self.make_response(resp)
558    }
559}