aliyun_oss_rs/object/
append_object.rs1use crate::{
2 common::{
3 Acl, CacheControl, ContentDisposition, StorageClass, invalid_metadata_key, url_encode,
4 },
5 error::{Error, normal_error},
6 request::{Oss, OssRequest},
7};
8use bytes::Bytes;
9use futures_util::StreamExt;
10use http::{Method, header};
11use http_body::Frame;
12use http_body_util::{Full, StreamBody};
13use std::collections::HashMap;
14use tokio::{fs::File, io::BufReader};
15use tokio_util::io::ReaderStream;
16
17pub struct AppendObject {
25 req: OssRequest,
26 mime: Option<String>,
27 tags: HashMap<String, String>,
28 callback: Option<Box<dyn Fn(u64, u64) + Send + Sync + 'static>>,
29}
30
31impl AppendObject {
32 pub(super) fn new(oss: Oss) -> Self {
33 let mut req = OssRequest::new(oss, Method::POST);
34 req.insert_query("append", "");
35 req.insert_query("position", "0");
36 AppendObject {
37 req,
38 mime: None,
39 tags: HashMap::new(),
40 callback: None,
41 }
42 }
43 pub fn set_position(mut self, position: u32) -> Self {
45 self.req.insert_query("position", position);
46 self
47 }
48 pub fn set_mime(mut self, mime: impl ToString) -> Self {
52 self.mime = Some(mime.to_string());
53 self
54 }
55 pub fn set_acl(mut self, acl: Acl) -> Self {
57 self.req.insert_header("x-oss-object-acl", acl);
58 self
59 }
60 pub fn set_storage_class(mut self, storage_class: StorageClass) -> Self {
62 self.req.insert_header("x-oss-storage-class", storage_class);
63 self
64 }
65 pub fn set_cache_control(mut self, cache_control: CacheControl) -> Self {
67 self.req.insert_header(header::CACHE_CONTROL, cache_control);
68 self
69 }
70 pub fn set_content_disposition(mut self, content_disposition: ContentDisposition) -> Self {
72 self.req
73 .insert_header(header::CONTENT_DISPOSITION, content_disposition);
74 self
75 }
76 pub fn set_meta(mut self, key: impl ToString, value: impl ToString) -> Self {
80 let key = key.to_string();
81 if !invalid_metadata_key(&key) {
82 self.req
83 .insert_header(format!("x-oss-meta-{}", key.to_string()), value);
84 }
85 self
86 }
87 pub fn set_tagging(mut self, key: impl ToString, value: impl ToString) -> Self {
89 self.tags.insert(key.to_string(), value.to_string());
90 self
91 }
92 pub fn set_callback(mut self, callback: Box<dyn Fn(u64, u64) + Send + Sync + 'static>) -> Self {
104 self.callback = Some(callback);
105 self
106 }
107 pub async fn send_file(mut self, file: impl ToString) -> Result<Option<String>, Error> {
112 let file_type = match self.mime {
114 Some(mime) => mime,
115 None => match infer::get_from_path(&file.to_string())? {
116 Some(ext) => ext.mime_type().to_owned(),
117 None => mime_guess::from_path(
118 &self
119 .req
120 .oss
121 .object
122 .clone()
123 .map(|v| v.to_string())
124 .unwrap_or_else(|| String::new()),
125 )
126 .first()
127 .map(|v| v.to_string())
128 .unwrap_or_else(|| "application/octet-stream".to_owned())
129 .to_string(),
130 },
131 };
132 self.req.insert_header(header::CONTENT_TYPE, file_type);
133 let tags = self
135 .tags
136 .into_iter()
137 .map(|(key, value)| {
138 if value.is_empty() {
139 url_encode(&key.to_string())
140 } else {
141 format!(
142 "{}={}",
143 url_encode(&key.to_string()),
144 url_encode(&value.to_string())
145 )
146 }
147 })
148 .collect::<Vec<_>>()
149 .join("&");
150 if !tags.is_empty() {
151 self.req.insert_header("x-oss-tagging", tags);
152 }
153 let file = File::open(file.to_string()).await?;
155 let file_size = file.metadata().await?.len();
157 if file_size >= 5_368_709_120 {
158 return Err(Error::InvalidFileSize);
159 }
160 self.req.insert_header(header::CONTENT_LENGTH, file_size);
161 let buf = BufReader::with_capacity(131072, file);
163 let stream = ReaderStream::with_capacity(buf, 16384);
164 let mut uploaded_size = 0;
166 let body = StreamBody::new(stream.map(move |result| match result {
168 Ok(chunk) => {
169 if let Some(callback) = &self.callback {
170 let upload_size = chunk.len() as u64;
171 uploaded_size += upload_size;
172 callback(uploaded_size, file_size);
173 }
174 Ok(Frame::data(chunk))
175 }
176 Err(err) => Err(err),
177 }));
178 self.req.set_body(body);
179 let response = self.req.send_to_oss()?.await?;
181 let status_code = response.status();
183 match status_code {
184 code if code.is_success() => {
185 let next_position = response
186 .headers()
187 .get("x-oss-next-append-position")
188 .and_then(|header| header.to_str().ok().map(|s| s.to_owned()));
189 Ok(next_position)
190 }
191 _ => Err(normal_error(response).await),
192 }
193 }
194 pub async fn send_content(mut self, content: Vec<u8>) -> Result<Option<String>, Error> {
197 let content_size = content.len();
199 if content_size >= 5_368_709_120 {
200 return Err(Error::InvalidFileSize);
201 }
202 self.req.insert_header(header::CONTENT_LENGTH, content_size);
203 let content_type = match self.mime {
205 Some(mime) => mime,
206 None => match infer::get(&content) {
207 Some(ext) => ext.mime_type().to_string(),
208 None => mime_guess::from_path(
209 self.req
210 .oss
211 .object
212 .clone()
213 .map(|v| v.to_string())
214 .unwrap_or_else(|| String::new().into()),
215 )
216 .first()
217 .map(|v| v.to_string())
218 .unwrap_or_else(|| "application/octet-stream".to_owned())
219 .to_string(),
220 },
221 };
222 self.req.insert_header(header::CONTENT_TYPE, content_type);
223 let tags = self
225 .tags
226 .into_iter()
227 .map(|(key, value)| {
228 if value.is_empty() {
229 url_encode(&key.to_string())
230 } else {
231 format!(
232 "{}={}",
233 url_encode(&key.to_string()),
234 url_encode(&value.to_string())
235 )
236 }
237 })
238 .collect::<Vec<_>>()
239 .join("&");
240 if !tags.is_empty() {
241 self.req.insert_header("x-oss-tagging", tags);
242 }
243 self.req.set_body(Full::new(Bytes::from(content)));
245 let response = self.req.send_to_oss()?.await?;
247 let status_code = response.status();
249 match status_code {
250 code if code.is_success() => {
251 let next_position = response
252 .headers()
253 .get("x-oss-next-append-position")
254 .and_then(|header| header.to_str().ok().map(|s| s.to_owned()));
255 Ok(next_position)
256 }
257 _ => Err(normal_error(response).await),
258 }
259 }
260}