1use crate::acl;
3use crate::client;
4pub use crate::request::{
5 CompleteMultipartUpload, ErrNo, InitiateMultipartUploadResult, Part, Request, Response,
6};
7pub use mime;
8pub use quick_xml::de::from_str;
9pub use quick_xml::se::to_string;
10pub use reqwest::Body;
11use std::collections::HashMap;
12use std::fs;
13use std::io::Cursor;
14
15#[async_trait::async_trait]
16pub trait Objects {
17 async fn put_object(
19 &self,
20 content_type: mime::Mime,
21 key: &str,
22 data: Vec<u8>,
23 acl_header: Option<&acl::AclHeader>,
24 ) -> Response;
25
26 async fn put_big_object(
28 &self,
29 file_path: &str,
30 key: &str,
31 content_type: mime::Mime,
32 storage_class: &str,
33 acl_header: Option<&acl::AclHeader>,
34 part_size: u64,
35 ) -> Response;
36
37 async fn head_object(&self, key: &str) -> Response;
39
40 async fn delete_object(&self, key: &str) -> Response;
42
43 async fn get_object_binary(&self, key: &str) -> Response;
45
46 async fn get_object(&self, key: &str, file_name: &str) -> Response;
48
49 async fn put_object_get_upload_id(
51 &self,
52 key: &str,
53 content_type: &mime::Mime,
54 storage_class: &str,
55 acl_header: Option<&acl::AclHeader>,
56 ) -> Response;
57
58 async fn put_object_part(
60 &self,
61 key: &str,
62 upload_id: &str,
63 part_number: u64,
64 body: Vec<u8>,
65 content_type: &mime::Mime,
66 acl_header: Option<&acl::AclHeader>,
67 ) -> Response;
68
69 async fn put_object_complete_part(
71 &self,
72 key: &str,
73 etag_map: &HashMap<u64, String>,
74 upload_id: &str,
75 ) -> Response;
76
77 async fn abort_object_part(&self, key: &str, upload_id: &str) -> Response;
81}
82
83#[async_trait::async_trait]
84impl Objects for client::Client {
85 async fn put_object(
103 &self,
104 content_type: mime::Mime,
105 key: &str,
106 data: Vec<u8>,
107 acl_header: Option<&acl::AclHeader>,
108 ) -> Response {
109 let mut headers = self.gen_common_headers();
110 headers.insert("Content-Type".to_string(), content_type.to_string());
111 headers.insert("Content-Length".to_string(), data.len().to_string());
112 let url_path = self.get_path_from_object_key(key);
113 headers =
114 self.get_headers_with_auth("put", url_path.as_str(), acl_header, Some(headers), None);
115 let resp = Request::put(
116 self.get_full_url_from_path(url_path.as_str()).as_str(),
117 None,
118 Some(&headers),
119 None,
120 None,
121 Some(reqwest::Body::from(data)),
122 )
123 .await;
124 self.make_response(resp)
125 }
126
127 async fn put_big_object(
145 &self,
146 file_path: &str,
147 key: &str,
148 content_type: mime::Mime,
149 storage_class: &str,
150 acl_header: Option<&acl::AclHeader>,
151 part_size: u64,
152 ) -> Response {
153 use tokio::io::AsyncReadExt;
154 use tokio::io::AsyncSeekExt;
155 use tokio::io::SeekFrom;
156 assert!(part_size > 0);
157 let mut file = match tokio::fs::File::open(file_path).await {
158 Ok(file) => file,
159 Err(e) => {
160 return Response::new(
161 ErrNo::IO,
162 format!("打开文件失败: {}, {}", file_path, e),
163 Default::default(),
164 )
165 }
166 };
167 let file_size = match file.metadata().await {
169 Ok(meta) => meta.len(),
170 Err(e) => {
171 return Response::new(
172 ErrNo::IO,
173 format!("获取文件大小失败: {}, {}", file_path, e),
174 Default::default(),
175 )
176 }
177 };
178 let mut part_number = 1;
179 let mut start: u64;
180 let mut etag_map = HashMap::new();
181 let upload_id = self
182 .put_object_get_upload_id(key, &content_type, storage_class, acl_header)
183 .await;
184 if upload_id.error_no != ErrNo::SUCCESS {
185 return upload_id;
186 }
187 let upload_id = String::from_utf8_lossy(&upload_id.result[..]).to_string();
188 loop {
189 start = part_size * (part_number - 1);
190 if start >= file_size {
191 let resp = self
193 .put_object_complete_part(key, &etag_map, upload_id.as_str())
194 .await;
195 if resp.error_no != ErrNo::SUCCESS {
196 self.abort_object_part(key, upload_id.as_str()).await;
198 }
199 return resp;
200 }
201 let mut size = part_size;
202 if file_size - start < part_size {
204 size = file_size - start;
205 }
206 if file_size - size - start <= 1024 * 1024 {
208 size = file_size - start;
209 }
210 if let Err(e) = file.seek(SeekFrom::Start(start)).await {
211 self.abort_object_part(key, upload_id.as_str()).await;
213 return Response::new(
214 ErrNo::IO,
215 format!("设置文件指针失败: {}, {}", file_path, e),
216 Default::default(),
217 );
218 }
219 let mut body: Vec<u8> = vec![0; size as usize];
220 if let Err(e) = file.read_exact(&mut body).await {
221 self.abort_object_part(key, upload_id.as_str()).await;
223 return Response::new(
224 ErrNo::IO,
225 format!("读取文件失败: {}, {}", file_path, e),
226 Default::default(),
227 );
228 }
229 let resp = self
230 .put_object_part(
231 key,
232 upload_id.as_str(),
233 part_number,
234 body,
235 &content_type,
236 acl_header,
237 )
238 .await;
239 if resp.error_no != ErrNo::SUCCESS {
240 self.abort_object_part(key, upload_id.as_str()).await;
242 return resp;
243 }
244 etag_map.insert(part_number, resp.headers["etag"].clone());
245 part_number += 1;
246 }
247 }
248
249 async fn head_object(&self, key: &str) -> Response {
250 let url_path = self.get_path_from_object_key(key);
251 let headers = self.get_headers_with_auth("head", url_path.as_str(), None, None, None);
252 let resp = Request::head(
253 self.get_full_url_from_path(url_path.as_str()).as_str(),
254 None,
255 Some(&headers),
256 )
257 .await;
258 match resp {
259 Ok(e) => e,
260 Err(e) => e,
261 }
262 }
263
264 async fn delete_object(&self, key: &str) -> Response {
277 let url_path = self.get_path_from_object_key(key);
278 let headers = self.get_headers_with_auth("delete", url_path.as_str(), None, None, None);
279 let resp = Request::delete(
280 self.get_full_url_from_path(url_path.as_str()).as_str(),
281 None,
282 Some(&headers),
283 None,
284 None,
285 )
286 .await;
287 match resp {
288 Ok(e) => e,
289 Err(e) => e,
290 }
291 }
292
293 async fn get_object_binary(&self, key: &str) -> Response {
306 let url_path = self.get_path_from_object_key(key);
307 let headers = self.get_headers_with_auth("get", url_path.as_str(), None, None, None);
308 let resp = Request::get(
309 self.get_full_url_from_path(url_path.as_str()).as_str(),
310 None,
311 Some(&headers),
312 )
313 .await;
314 self.make_response(resp)
315 }
316
317 async fn get_object(&self, key: &str, file_name: &str) -> Response {
330 let resp = self.get_object_binary(key).await;
331 if resp.error_no == ErrNo::SUCCESS {
332 let output_file_r = fs::File::create(file_name);
333 let mut output_file;
334 match output_file_r {
335 Ok(e) => output_file = e,
336 Err(e) => {
337 return Response::new(
338 ErrNo::OTHER,
339 format!("创建文件失败: {}", e),
340 "".to_string(),
341 );
342 }
343 }
344 if let Err(e) = std::io::copy(&mut Cursor::new(resp.result), &mut output_file) {
345 return Response::new(ErrNo::OTHER, format!("下载文件失败: {}", e), "".to_string());
346 }
347 return Response::blank_success();
348 }
349 resp
350 }
351 async fn put_object_get_upload_id(
354 &self,
355 key: &str,
356 content_type: &mime::Mime,
357 storage_class: &str,
358 acl_header: Option<&acl::AclHeader>,
359 ) -> Response {
360 let mut query = HashMap::new();
361 query.insert("uploads".to_string(), "".to_string());
362 let url_path = self.get_path_from_object_key(key);
363 let mut headers = self.gen_common_headers();
364 headers.insert("Content-Type".to_string(), content_type.to_string());
365 headers.insert("x-cos-storage-class".to_string(), storage_class.to_string());
366 let headers = self.get_headers_with_auth(
367 "post",
368 url_path.as_str(),
369 acl_header,
370 Some(headers),
371 Some(&query),
372 );
373 let resp = Request::post(
374 self.get_full_url_from_path(url_path.as_str()).as_str(),
375 Some(&query),
376 Some(&headers),
377 None,
378 None,
379 None as Option<Body>,
380 )
381 .await;
382 match resp {
383 Ok(res) => {
384 if res.error_no != ErrNo::SUCCESS {
385 return res;
386 }
387 match quick_xml::de::from_slice::<InitiateMultipartUploadResult>(&res.result[..]) {
388 Ok(res) => Response::new(ErrNo::SUCCESS, "".to_string(), res.upload_id),
389 Err(e) => Response::new(ErrNo::DECODE, e.to_string(), Default::default()),
390 }
391 }
392 Err(e) => e,
393 }
394 }
395
396 async fn put_object_part(
399 &self,
400 key: &str,
401 upload_id: &str,
402 part_number: u64,
403 body: Vec<u8>,
404 content_type: &mime::Mime,
405 acl_header: Option<&acl::AclHeader>,
406 ) -> Response {
407 let mut headers = self.gen_common_headers();
408 headers.insert("Content-Type".to_string(), content_type.to_string());
409 headers.insert("Content-Length".to_string(), body.len().to_string());
410 let url_path = self.get_path_from_object_key(key);
411 let mut query = HashMap::new();
412 query.insert("partNumber".to_string(), part_number.to_string());
413 query.insert("uploadId".to_string(), upload_id.to_string());
414 headers = self.get_headers_with_auth(
415 "put",
416 url_path.as_str(),
417 acl_header,
418 Some(headers),
419 Some(&query),
420 );
421 let resp = Request::put(
422 self.get_full_url_from_path(url_path.as_str()).as_str(),
423 Some(&query),
424 Some(&headers),
425 None,
426 None,
427 Some(body),
428 )
429 .await;
430 self.make_response(resp)
431 }
432
433 async fn put_object_complete_part(
436 &self,
437 key: &str,
438 etag_map: &HashMap<u64, String>,
439 upload_id: &str,
440 ) -> Response {
441 let url_path = self.get_path_from_object_key(key);
442 let mut query = HashMap::new();
443 query.insert("uploadId".to_string(), upload_id.to_string());
444 let mut headers = self.gen_common_headers();
445 headers.insert("Content-Type".to_string(), "application/xml".to_string());
446 let headers = self.get_headers_with_auth(
447 "post",
448 url_path.as_str(),
449 None,
450 Some(headers),
451 Some(&query),
452 );
453 let mut parts = Vec::new();
454 let mut keys = Vec::new();
456 for k in etag_map.keys() {
457 keys.push(k);
458 }
459 keys.sort();
460 for k in keys {
461 parts.push(Part {
462 part_number: *k,
463 etag: etag_map[k].clone(),
464 })
465 }
466 let complete = CompleteMultipartUpload { part: parts };
467 let serialized_str = match to_string(&complete) {
468 Ok(s) => s,
469 Err(e) => return Response::new(ErrNo::ENCODE, e.to_string(), Default::default()),
470 };
471 let resp = Request::post(
472 self.get_full_url_from_path(url_path.as_str()).as_str(),
473 Some(&query),
474 Some(&headers),
475 None,
476 None,
477 Some(serialized_str),
478 )
479 .await;
480 self.make_response(resp)
481 }
482
483 async fn abort_object_part(&self, key: &str, upload_id: &str) -> Response {
486 let url_path = self.get_path_from_object_key(key);
487 let mut query = HashMap::new();
488 query.insert("uploadId".to_string(), upload_id.to_string());
489 let headers =
490 self.get_headers_with_auth("delete", url_path.as_str(), None, None, Some(&query));
491 let resp = Request::delete(
492 self.get_full_url_from_path(url_path.as_str()).as_str(),
493 Some(&query),
494 Some(&headers),
495 None,
496 None,
497 )
498 .await;
499 self.make_response(resp)
500 }
501}