1use crate::acl;
3use crate::client;
4pub use crate::request::{
5 CompleteMultipartUpload, ErrNo, InitiateMultipartUploadResult, Part, Request, Response,
6};
7pub use mime;
8pub use quick_xml::de::from_str;
9pub use quick_xml::se::to_string;
10pub use reqwest::Body;
11use std::collections::HashMap;
12use std::fs;
13use std::io::Cursor;
14
15#[async_trait::async_trait]
16pub trait Objects {
17 async fn put_object(
19 &self,
20 file_path: &str,
21 key: &str,
22 content_type: mime::Mime,
23 acl_header: Option<&acl::AclHeader>,
24 ) -> Response;
25
26 async fn put_big_object(
28 &self,
29 file_path: &str,
30 key: &str,
31 content_type: mime::Mime,
32 storage_class: &str,
33 acl_header: Option<&acl::AclHeader>,
34 part_size: u64,
35 ) -> Response;
36
37 async fn put_object_binary<T: Into<Body> + Send>(
39 &self,
40 file: T,
41 key: &str,
42 content_type: mime::Mime,
43 acl_header: Option<&acl::AclHeader>,
44 ) -> Response;
45
46 async fn delete_object(&self, key: &str) -> Response;
48
49 async fn get_object_binary(&self, key: &str) -> Response;
51
52 async fn get_object(&self, key: &str, file_name: &str) -> Response;
54
55 async fn put_object_get_upload_id(
57 &self,
58 key: &str,
59 content_type: &mime::Mime,
60 storage_class: &str,
61 acl_header: Option<&acl::AclHeader>,
62 ) -> Response;
63
64 async fn put_object_part(
66 &self,
67 key: &str,
68 upload_id: &str,
69 part_number: u64,
70 body: Vec<u8>,
71 content_type: &mime::Mime,
72 acl_header: Option<&acl::AclHeader>,
73 ) -> Response;
74
75 async fn put_object_complete_part(
77 &self,
78 key: &str,
79 etag_map: &HashMap<u64, String>,
80 upload_id: &str,
81 ) -> Response;
82
83 async fn abort_object_part(&self, key: &str, upload_id: &str) -> Response;
87}
88
89#[async_trait::async_trait]
90impl Objects for client::Client {
91 async fn put_object(
108 &self,
109 file_path: &str,
110 key: &str,
111 content_type: mime::Mime,
112 acl_header: Option<&acl::AclHeader>,
113 ) -> Response {
114 let file = match tokio::fs::File::open(file_path).await {
115 Ok(file) => file,
116 Err(e) => {
117 return Response::new(
118 ErrNo::IO,
119 format!("打开文件失败: {}, {}", file_path, e),
120 Default::default(),
121 )
122 }
123 };
124 let file_size = match file.metadata().await {
126 Ok(meta) => meta.len() as usize,
127 Err(e) => {
128 return Response::new(
129 ErrNo::IO,
130 format!("获取文件大小失败: {}, {}", file_path, e),
131 Default::default(),
132 )
133 }
134 };
135 let mut headers = self.gen_common_headers();
136 headers.insert("Content-Type".to_string(), content_type.to_string());
137 headers.insert("Content-Length".to_string(), file_size.to_string());
138 let url_path = self.get_path_from_object_key(key);
139 headers =
140 self.get_headers_with_auth("put", url_path.as_str(), acl_header, Some(headers), None);
141 let resp = Request::put(
142 self.get_full_url_from_path(url_path.as_str()).as_str(),
143 None,
144 Some(&headers),
145 None,
146 None,
147 Some(file),
148 )
149 .await;
150 self.make_response(resp)
151 }
152
153 async fn put_big_object(
171 &self,
172 file_path: &str,
173 key: &str,
174 content_type: mime::Mime,
175 storage_class: &str,
176 acl_header: Option<&acl::AclHeader>,
177 part_size: u64,
178 ) -> Response {
179 use tokio::io::AsyncReadExt;
180 use tokio::io::AsyncSeekExt;
181 use tokio::io::SeekFrom;
182 assert!(part_size > 0);
183 let mut file = match tokio::fs::File::open(file_path).await {
184 Ok(file) => file,
185 Err(e) => {
186 return Response::new(
187 ErrNo::IO,
188 format!("打开文件失败: {}, {}", file_path, e),
189 Default::default(),
190 )
191 }
192 };
193 let file_size = match file.metadata().await {
195 Ok(meta) => meta.len(),
196 Err(e) => {
197 return Response::new(
198 ErrNo::IO,
199 format!("获取文件大小失败: {}, {}", file_path, e),
200 Default::default(),
201 )
202 }
203 };
204 let mut part_number = 1;
205 let mut start;
206 let mut etag_map = HashMap::new();
207 let upload_id = self
208 .put_object_get_upload_id(key, &content_type, storage_class, acl_header)
209 .await;
210 if upload_id.error_no != ErrNo::SUCCESS {
211 return upload_id;
212 }
213 let upload_id = String::from_utf8_lossy(&upload_id.result[..]).to_string();
214 loop {
215 start = part_size * (part_number - 1);
216 if start >= file_size {
217 let resp = self
219 .put_object_complete_part(key, &etag_map, upload_id.as_str())
220 .await;
221 if resp.error_no != ErrNo::SUCCESS {
222 self.abort_object_part(key, upload_id.as_str()).await;
224 }
225 return resp;
226 }
227 let mut size = part_size;
228 if file_size - start < part_size {
230 size = file_size - start;
231 }
232 if file_size - size - start <= 1024 * 1024 {
234 size = file_size - start;
235 }
236 if let Err(e) = file.seek(SeekFrom::Start(start as u64)).await {
237 self.abort_object_part(key, upload_id.as_str()).await;
239 return Response::new(
240 ErrNo::IO,
241 format!("设置文件指针失败: {}, {}", file_path, e),
242 Default::default(),
243 );
244 }
245 let mut body: Vec<u8> = vec![0; size as usize];
246 if let Err(e) = file.read_exact(&mut body).await {
247 self.abort_object_part(key, upload_id.as_str()).await;
249 return Response::new(
250 ErrNo::IO,
251 format!("读取文件失败: {}, {}", file_path, e),
252 Default::default(),
253 );
254 }
255 let resp = self
256 .put_object_part(
257 key,
258 upload_id.as_str(),
259 part_number,
260 body,
261 &content_type,
262 acl_header,
263 )
264 .await;
265 if resp.error_no != ErrNo::SUCCESS {
266 self.abort_object_part(key, upload_id.as_str()).await;
268 return resp;
269 }
270 etag_map.insert(part_number, resp.headers["etag"].clone());
271 part_number += 1;
272 }
273 }
274
275 async fn put_object_binary<T: Into<Body> + Send>(
293 &self,
294 file: T,
295 key: &str,
296 content_type: mime::Mime,
297 acl_header: Option<&acl::AclHeader>,
298 ) -> Response {
299 let body: Body = file.into();
300 let bytes = body.as_bytes();
301 if bytes.is_none() {
302 return Response::new(ErrNo::IO, "不是内存对象".to_owned(), Default::default());
303 }
304 let file_size = bytes.unwrap().len();
305 let mut headers = self.gen_common_headers();
306 headers.insert("Content-Type".to_string(), content_type.to_string());
307 headers.insert("Content-Length".to_string(), file_size.to_string());
308 let url_path = self.get_path_from_object_key(key);
309 headers =
310 self.get_headers_with_auth("put", url_path.as_str(), acl_header, Some(headers), None);
311 let resp = Request::put(
312 self.get_full_url_from_path(url_path.as_str()).as_str(),
313 None,
314 Some(&headers),
315 None,
316 None,
317 Some(body),
318 )
319 .await;
320 self.make_response(resp)
321 }
322 async fn delete_object(&self, key: &str) -> Response {
335 let url_path = self.get_path_from_object_key(key);
336 let headers = self.get_headers_with_auth("delete", url_path.as_str(), None, None, None);
337 let resp = Request::delete(
338 self.get_full_url_from_path(url_path.as_str()).as_str(),
339 None,
340 Some(&headers),
341 None,
342 None,
343 )
344 .await;
345 match resp {
346 Ok(e) => e,
347 Err(e) => e,
348 }
349 }
350
351 async fn get_object_binary(&self, key: &str) -> Response {
364 let url_path = self.get_path_from_object_key(key);
365 let headers = self.get_headers_with_auth("get", url_path.as_str(), None, None, None);
366 let resp = Request::get(
367 self.get_full_url_from_path(url_path.as_str()).as_str(),
368 None,
369 Some(&headers),
370 )
371 .await;
372 self.make_response(resp)
373 }
374
375 async fn get_object(&self, key: &str, file_name: &str) -> Response {
388 let resp = self.get_object_binary(key).await;
389 if resp.error_no == ErrNo::SUCCESS {
390 let output_file_r = fs::File::create(file_name);
391 let mut output_file;
392 match output_file_r {
393 Ok(e) => output_file = e,
394 Err(e) => {
395 return Response::new(
396 ErrNo::OTHER,
397 format!("创建文件失败: {}", e),
398 "".to_string(),
399 );
400 }
401 }
402 if let Err(e) = std::io::copy(&mut Cursor::new(resp.result), &mut output_file) {
403 return Response::new(ErrNo::OTHER, format!("下载文件失败: {}", e), "".to_string());
404 }
405 return Response::blank_success();
406 }
407 resp
408 }
409 async fn put_object_get_upload_id(
412 &self,
413 key: &str,
414 content_type: &mime::Mime,
415 storage_class: &str,
416 acl_header: Option<&acl::AclHeader>,
417 ) -> Response {
418 let mut query = HashMap::new();
419 query.insert("uploads".to_string(), "".to_string());
420 let url_path = self.get_path_from_object_key(key);
421 let mut headers = self.gen_common_headers();
422 headers.insert("Content-Type".to_string(), content_type.to_string());
423 headers.insert("x-cos-storage-class".to_string(), storage_class.to_string());
424 let headers = self.get_headers_with_auth(
425 "post",
426 url_path.as_str(),
427 acl_header,
428 Some(headers),
429 Some(&query),
430 );
431 let resp = Request::post(
432 self.get_full_url_from_path(url_path.as_str()).as_str(),
433 Some(&query),
434 Some(&headers),
435 None,
436 None,
437 None as Option<Body>,
438 )
439 .await;
440 match resp {
441 Ok(res) => {
442 if res.error_no != ErrNo::SUCCESS {
443 return res;
444 }
445 match quick_xml::de::from_slice::<InitiateMultipartUploadResult>(&res.result[..]) {
446 Ok(res) => Response::new(ErrNo::SUCCESS, "".to_string(), res.upload_id),
447 Err(e) => Response::new(ErrNo::DECODE, e.to_string(), Default::default()),
448 }
449 }
450 Err(e) => e,
451 }
452 }
453
454 async fn put_object_part(
457 &self,
458 key: &str,
459 upload_id: &str,
460 part_number: u64,
461 body: Vec<u8>,
462 content_type: &mime::Mime,
463 acl_header: Option<&acl::AclHeader>,
464 ) -> Response {
465 let mut headers = self.gen_common_headers();
466 headers.insert("Content-Type".to_string(), content_type.to_string());
467 headers.insert("Content-Length".to_string(), body.len().to_string());
468 let url_path = self.get_path_from_object_key(key);
469 let mut query = HashMap::new();
470 query.insert("partNumber".to_string(), part_number.to_string());
471 query.insert("uploadId".to_string(), upload_id.to_string());
472 headers = self.get_headers_with_auth(
473 "put",
474 url_path.as_str(),
475 acl_header,
476 Some(headers),
477 Some(&query),
478 );
479 let resp = Request::put(
480 self.get_full_url_from_path(url_path.as_str()).as_str(),
481 Some(&query),
482 Some(&headers),
483 None,
484 None,
485 Some(body),
486 )
487 .await;
488 self.make_response(resp)
489 }
490
491 async fn put_object_complete_part(
494 &self,
495 key: &str,
496 etag_map: &HashMap<u64, String>,
497 upload_id: &str,
498 ) -> Response {
499 let url_path = self.get_path_from_object_key(key);
500 let mut query = HashMap::new();
501 query.insert("uploadId".to_string(), upload_id.to_string());
502 let mut headers = self.gen_common_headers();
503 headers.insert("Content-Type".to_string(), "application/xml".to_string());
504 let headers = self.get_headers_with_auth(
505 "post",
506 url_path.as_str(),
507 None,
508 Some(headers),
509 Some(&query),
510 );
511 let mut parts = Vec::new();
512 let mut keys = Vec::new();
514 for k in etag_map.keys() {
515 keys.push(k);
516 }
517 keys.sort();
518 for k in keys {
519 parts.push(Part {
520 part_number: k.clone(),
521 etag: etag_map[&k].clone(),
522 })
523 }
524 let complete = CompleteMultipartUpload { part: parts };
525 let serialized_str = match to_string(&complete) {
526 Ok(s) => s,
527 Err(e) => return Response::new(ErrNo::ENCODE, e.to_string(), Default::default()),
528 };
529 let resp = Request::post(
530 self.get_full_url_from_path(url_path.as_str()).as_str(),
531 Some(&query),
532 Some(&headers),
533 None,
534 None,
535 Some(serialized_str),
536 )
537 .await;
538 self.make_response(resp)
539 }
540
541 async fn abort_object_part(&self, key: &str, upload_id: &str) -> Response {
544 let url_path = self.get_path_from_object_key(key);
545 let mut query = HashMap::new();
546 query.insert("uploadId".to_string(), upload_id.to_string());
547 let headers =
548 self.get_headers_with_auth("delete", url_path.as_str(), None, None, Some(&query));
549 let resp = Request::delete(
550 self.get_full_url_from_path(url_path.as_str()).as_str(),
551 Some(&query),
552 Some(&headers),
553 None,
554 None,
555 )
556 .await;
557 self.make_response(resp)
558 }
559}