aliyun_oss_rs/object/
put_object.rs1use crate::{
2 common::{
3 Acl, CacheControl, ContentDisposition, StorageClass, invalid_metadata_key, url_encode,
4 },
5 error::{Error, normal_error},
6 request::{Oss, OssRequest},
7};
8use bytes::Bytes;
9use futures_util::StreamExt;
10use http::{Method, header};
11use http_body::Frame;
12use http_body_util::{Full, StreamBody};
13use std::collections::HashMap;
14use tokio::{fs::File, io::BufReader};
15use tokio_util::io::ReaderStream;
16
17pub struct PutObject {
25 req: OssRequest,
26 mime: Option<String>,
27 tags: HashMap<String, String>,
28 callback: Option<Box<dyn Fn(u64, u64) + Send + Sync + 'static>>,
29}
30impl PutObject {
31 pub(super) fn new(oss: Oss) -> Self {
32 PutObject {
33 req: OssRequest::new(oss, Method::PUT),
34 mime: None,
35 tags: HashMap::new(),
36 callback: None,
37 }
38 }
39 pub fn set_mime(mut self, mime: impl ToString) -> Self {
43 self.mime = Some(mime.to_string());
44 self
45 }
46 pub fn set_acl(mut self, acl: Acl) -> Self {
48 self.req.insert_header("x-oss-object-acl", acl);
49 self
50 }
51 pub fn set_storage_class(mut self, storage_class: StorageClass) -> Self {
53 self.req.insert_header("x-oss-storage-class", storage_class);
54 self
55 }
56 pub fn set_cache_control(mut self, cache_control: CacheControl) -> Self {
58 self.req.insert_header(header::CACHE_CONTROL, cache_control);
59 self
60 }
61 pub fn set_content_disposition(mut self, content_disposition: ContentDisposition) -> Self {
63 self.req
64 .insert_header(header::CONTENT_DISPOSITION, content_disposition);
65 self
66 }
67 pub fn forbid_overwrite(mut self) -> Self {
69 self.req.insert_header("x-oss-forbid-overwrite", "true");
70 self
71 }
72 pub fn set_meta(mut self, key: impl ToString, value: impl ToString) -> Self {
76 let key = key.to_string();
77 if !invalid_metadata_key(&key) {
78 self.req
79 .insert_header(format!("x-oss-meta-{}", key.to_string()), value);
80 }
81 self
82 }
83 pub fn set_tagging(mut self, key: impl ToString, value: impl ToString) -> Self {
85 self.tags.insert(key.to_string(), value.to_string());
86 self
87 }
88 pub fn set_callback(mut self, callback: Box<dyn Fn(u64, u64) + Send + Sync + 'static>) -> Self {
100 self.callback = Some(callback);
101 self
102 }
103 pub async fn send_file(mut self, file: impl ToString) -> Result<(), Error> {
105 let file_type = match self.mime {
107 Some(mime) => mime,
108 None => match infer::get_from_path(&file.to_string())? {
109 Some(ext) => ext.mime_type().to_owned(),
110 None => mime_guess::from_path(
111 &self
112 .req
113 .oss
114 .object
115 .clone()
116 .map(|v| v.to_string())
117 .unwrap_or_else(|| String::new()),
118 )
119 .first()
120 .map(|v| v.to_string())
121 .unwrap_or_else(|| "application/octet-stream".to_owned())
122 .to_string(),
123 },
124 };
125 self.req.insert_header(header::CONTENT_TYPE, file_type);
126 let tags = self
128 .tags
129 .into_iter()
130 .map(|(key, value)| {
131 if value.is_empty() {
132 url_encode(&key.to_string())
133 } else {
134 format!(
135 "{}={}",
136 url_encode(&key.to_string()),
137 url_encode(&value.to_string())
138 )
139 }
140 })
141 .collect::<Vec<_>>()
142 .join("&");
143 if !tags.is_empty() {
144 self.req.insert_header("x-oss-tagging", tags);
145 }
146 let file = File::open(file.to_string()).await?;
148 let file_size = file.metadata().await?.len();
150 if file_size >= 5_368_709_120 {
151 return Err(Error::InvalidFileSize);
152 }
153 let buf = BufReader::with_capacity(131072, file);
155 let stream = ReaderStream::with_capacity(buf, 16384);
156 let mut uploaded_size = 0;
158 let body = StreamBody::new(stream.map(move |result| match result {
160 Ok(chunk) => {
161 if let Some(callback) = &self.callback {
162 let upload_size = chunk.len() as u64;
163 uploaded_size += upload_size;
164 callback(uploaded_size, file_size);
165 }
166 Ok(Frame::data(chunk))
167 }
168 Err(err) => Err(err),
169 }));
170 self.req.set_body(body);
171 let response = self.req.send_to_oss()?.await?;
173 let status_code = response.status();
175 match status_code {
176 code if code.is_success() => Ok(()),
177 _ => Err(normal_error(response).await),
178 }
179 }
180 pub async fn send_content(mut self, content: Vec<u8>) -> Result<(), Error> {
182 let content_type = match self.mime {
184 Some(mime) => mime,
185 None => match infer::get(&content) {
186 Some(ext) => ext.mime_type().to_string(),
187 None => mime_guess::from_path(
188 self.req
189 .oss
190 .object
191 .clone()
192 .map(|v| v.to_string())
193 .unwrap_or_else(|| String::new().into()),
194 )
195 .first()
196 .map(|v| v.to_string())
197 .unwrap_or_else(|| "application/octet-stream".to_owned())
198 .to_string(),
199 },
200 };
201 self.req.insert_header(header::CONTENT_TYPE, content_type);
202 let tags = self
204 .tags
205 .into_iter()
206 .map(|(key, value)| {
207 if value.is_empty() {
208 url_encode(&key.to_string())
209 } else {
210 format!(
211 "{}={}",
212 url_encode(&key.to_string()),
213 url_encode(&value.to_string())
214 )
215 }
216 })
217 .collect::<Vec<_>>()
218 .join("&");
219 if !tags.is_empty() {
220 self.req.insert_header("x-oss-tagging", tags);
221 }
222 let content_size = content.len() as u64;
224 if content_size >= 5_368_709_120 {
225 return Err(Error::InvalidFileSize);
226 }
227 self.req.insert_header(header::CONTENT_LENGTH, content_size);
228 self.req.set_body(Full::new(Bytes::from(content)));
230 let response = self.req.send_to_oss()?.await?;
232 let status_code = response.status();
234 match status_code {
235 code if code.is_success() => Ok(()),
236 _ => Err(normal_error(response).await),
237 }
238 }
239}