aliyun_oss_rs/object/
multipart_upload_part.rs1use crate::{
2 error::{Error, normal_error},
3 request::{Oss, OssRequest},
4};
5use bytes::Bytes;
6use futures_util::StreamExt;
7use http::{Method, header};
8use http_body::Frame;
9use http_body_util::{Full, StreamBody};
10use tokio::{fs::File, io::BufReader};
11use tokio_util::io::ReaderStream;
12
13pub struct UploadPart {
17 req: OssRequest,
18 callback: Option<Box<dyn Fn(u64, u64) + Send + Sync + 'static>>,
19}
20impl UploadPart {
21 pub(super) fn new(oss: Oss, part_number: u32, upload_id: impl ToString) -> Self {
22 let mut req = OssRequest::new(oss, Method::PUT);
23 req.insert_query("partNumber", part_number);
24 req.insert_query("uploadId", upload_id);
25 UploadPart {
26 req,
27 callback: None,
28 }
29 }
30 pub fn set_callback(mut self, callback: Box<dyn Fn(u64, u64) + Send + Sync + 'static>) -> Self {
42 self.callback = Some(callback);
43 self
44 }
45 pub async fn send_file(mut self, file: impl ToString) -> Result<String, Error> {
49 let file = File::open(file.to_string()).await?;
51 let file_size = file.metadata().await?.len();
53 if file_size >= 5_368_709_120 || file_size < 102_400 {
54 return Err(Error::InvalidFileSize);
55 }
56 let buf = BufReader::with_capacity(131072, file);
58 let stream = ReaderStream::with_capacity(buf, 16384);
59 let mut uploaded_size = 0;
61 let body = StreamBody::new(stream.map(move |result| match result {
63 Ok(chunk) => {
64 if let Some(callback) = &self.callback {
65 let upload_size = chunk.len() as u64;
66 uploaded_size += upload_size;
67 callback(uploaded_size, file_size);
68 }
69 Ok(Frame::data(chunk))
70 }
71 Err(err) => Err(err),
72 }));
73 self.req.set_body(body);
74 let response = self.req.send_to_oss()?.await?;
76 let status_code = response.status();
78 match status_code {
79 code if code.is_success() => {
80 let e_tag = response
81 .headers()
82 .get("ETag")
83 .map(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
84 .flatten()
85 .unwrap_or_else(|| String::new());
86 Ok(e_tag)
87 }
88 _ => Err(normal_error(response).await),
89 }
90 }
91 pub async fn send_content(mut self, content: Vec<u8>) -> Result<String, Error> {
95 let content_size = content.len() as u64;
97 if content_size >= 5_000_000_000 {
98 return Err(Error::InvalidFileSize);
99 }
100 self.req.insert_header(header::CONTENT_LENGTH, content_size);
101 self.req.set_body(Full::new(Bytes::from(content)));
103 let response = self.req.send_to_oss()?.await?;
105 let status_code = response.status();
107 match status_code {
108 code if code.is_success() => {
109 let e_tag = response
110 .headers()
111 .get("ETag")
112 .map(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
113 .flatten()
114 .unwrap_or_else(|| String::new());
115 Ok(e_tag)
116 }
117 _ => Err(normal_error(response).await),
118 }
119 }
120}