aliyun_oss_rs/object/
put_object.rs

1use crate::{
2    common::{
3        Acl, CacheControl, ContentDisposition, StorageClass, invalid_metadata_key, url_encode,
4    },
5    error::{Error, normal_error},
6    request::{Oss, OssRequest},
7};
8use bytes::Bytes;
9use futures_util::StreamExt;
10use http::{Method, header};
11use http_body::Frame;
12use http_body_util::{Full, StreamBody};
13use std::collections::HashMap;
14use tokio::{fs::File, io::BufReader};
15use tokio_util::io::ReaderStream;
16
17/// Upload a file
18///
19/// The object size cannot exceed 5GB
20///
21/// By default, if an object with the same name exists and you have access, the new object overwrites it
22///
23/// See the [Alibaba Cloud documentation](https://help.aliyun.com/document_detail/31978.html) for details
24pub struct PutObject {
25    req: OssRequest,
26    mime: Option<String>,
27    tags: HashMap<String, String>,
28    callback: Option<Box<dyn Fn(u64, u64) + Send + Sync + 'static>>,
29}
30impl PutObject {
31    pub(super) fn new(oss: Oss) -> Self {
32        PutObject {
33            req: OssRequest::new(oss, Method::PUT),
34            mime: None,
35            tags: HashMap::new(),
36            callback: None,
37        }
38    }
39    /// Set the file's MIME type
40    ///
41    /// If no MIME type is set, the request will attempt to obtain it from the content, local path, or remote path; if still not found, the default MIME type (application/octet-stream) is used
42    pub fn set_mime(mut self, mime: impl ToString) -> Self {
43        self.mime = Some(mime.to_string());
44        self
45    }
46    /// Set the file's access permissions
47    pub fn set_acl(mut self, acl: Acl) -> Self {
48        self.req.insert_header("x-oss-object-acl", acl);
49        self
50    }
51    /// Set the file's storage class
52    pub fn set_storage_class(mut self, storage_class: StorageClass) -> Self {
53        self.req.insert_header("x-oss-storage-class", storage_class);
54        self
55    }
56    /// Cache behavior of the webpage when the file is downloaded
57    pub fn set_cache_control(mut self, cache_control: CacheControl) -> Self {
58        self.req.insert_header(header::CACHE_CONTROL, cache_control);
59        self
60    }
61    /// Set how the file is presented
62    pub fn set_content_disposition(mut self, content_disposition: ContentDisposition) -> Self {
63        self.req
64            .insert_header(header::CONTENT_DISPOSITION, content_disposition);
65        self
66    }
67    /// Disallow overwriting files with the same name
68    pub fn forbid_overwrite(mut self) -> Self {
69        self.req.insert_header("x-oss-forbid-overwrite", "true");
70        self
71    }
72    /// Set additional metadata
73    ///
74    /// Keys may only contain letters, numbers, and hyphens; metadata with other characters will be discarded
75    pub fn set_meta(mut self, key: impl ToString, value: impl ToString) -> Self {
76        let key = key.to_string();
77        if !invalid_metadata_key(&key) {
78            self.req
79                .insert_header(format!("x-oss-meta-{}", key.to_string()), value);
80        }
81        self
82    }
83    /// Set tag information
84    pub fn set_tagging(mut self, key: impl ToString, value: impl ToString) -> Self {
85        self.tags.insert(key.to_string(), value.to_string());
86        self
87    }
88    /// Set a callback for upload progress; this only applies to `send_file()`
89    /// ```
90    /// let callback = Box::new(|uploaded_size: u64, total_size: u64| {
91    ///     let percentage = if total_size == 0 {
92    ///         100.0
93    ///     } else {
94    ///         (uploaded_size as f64) / (total_size as f64) * 100.00
95    ///     };
96    ///     println!("{:.2}%", percentage);
97    /// });
98    /// ```
99    pub fn set_callback(mut self, callback: Box<dyn Fn(u64, u64) + Send + Sync + 'static>) -> Self {
100        self.callback = Some(callback);
101        self
102    }
103    /// Upload a file from disk to OSS
104    pub async fn send_file(mut self, file: impl ToString) -> Result<(), Error> {
105        // Determine file MIME type
106        let file_type = match self.mime {
107            Some(mime) => mime,
108            None => match infer::get_from_path(&file.to_string())? {
109                Some(ext) => ext.mime_type().to_owned(),
110                None => mime_guess::from_path(
111                    &self
112                        .req
113                        .oss
114                        .object
115                        .clone()
116                        .map(|v| v.to_string())
117                        .unwrap_or_else(|| String::new()),
118                )
119                .first()
120                .map(|v| v.to_string())
121                .unwrap_or_else(|| "application/octet-stream".to_owned())
122                .to_string(),
123            },
124        };
125        self.req.insert_header(header::CONTENT_TYPE, file_type);
126        // Insert tags
127        let tags = self
128            .tags
129            .into_iter()
130            .map(|(key, value)| {
131                if value.is_empty() {
132                    url_encode(&key.to_string())
133                } else {
134                    format!(
135                        "{}={}",
136                        url_encode(&key.to_string()),
137                        url_encode(&value.to_string())
138                    )
139                }
140            })
141            .collect::<Vec<_>>()
142            .join("&");
143        if !tags.is_empty() {
144            self.req.insert_header("x-oss-tagging", tags);
145        }
146        // Open the file
147        let file = File::open(file.to_string()).await?;
148        // Read the file size
149        let file_size = file.metadata().await?.len();
150        if file_size >= 5_368_709_120 {
151            return Err(Error::InvalidFileSize);
152        }
153        // Initialize the data stream for reading file content
154        let buf = BufReader::with_capacity(131072, file);
155        let stream = ReaderStream::with_capacity(buf, 16384);
156        // Initialize the uploaded content size
157        let mut uploaded_size = 0;
158        // Initialize upload request
159        let body = StreamBody::new(stream.map(move |result| match result {
160            Ok(chunk) => {
161                if let Some(callback) = &self.callback {
162                    let upload_size = chunk.len() as u64;
163                    uploaded_size += upload_size;
164                    callback(uploaded_size, file_size);
165                }
166                Ok(Frame::data(chunk))
167            }
168            Err(err) => Err(err),
169        }));
170        self.req.set_body(body);
171        // Upload file
172        let response = self.req.send_to_oss()?.await?;
173        // Parse the response
174        let status_code = response.status();
175        match status_code {
176            code if code.is_success() => Ok(()),
177            _ => Err(normal_error(response).await),
178        }
179    }
180    /// Upload in-memory data to OSS
181    pub async fn send_content(mut self, content: Vec<u8>) -> Result<(), Error> {
182        // Determine file MIME type
183        let content_type = match self.mime {
184            Some(mime) => mime,
185            None => match infer::get(&content) {
186                Some(ext) => ext.mime_type().to_string(),
187                None => mime_guess::from_path(
188                    self.req
189                        .oss
190                        .object
191                        .clone()
192                        .map(|v| v.to_string())
193                        .unwrap_or_else(|| String::new().into()),
194                )
195                .first()
196                .map(|v| v.to_string())
197                .unwrap_or_else(|| "application/octet-stream".to_owned())
198                .to_string(),
199            },
200        };
201        self.req.insert_header(header::CONTENT_TYPE, content_type);
202        // Insert tags
203        let tags = self
204            .tags
205            .into_iter()
206            .map(|(key, value)| {
207                if value.is_empty() {
208                    url_encode(&key.to_string())
209                } else {
210                    format!(
211                        "{}={}",
212                        url_encode(&key.to_string()),
213                        url_encode(&value.to_string())
214                    )
215                }
216            })
217            .collect::<Vec<_>>()
218            .join("&");
219        if !tags.is_empty() {
220            self.req.insert_header("x-oss-tagging", tags);
221        }
222        // Read size
223        let content_size = content.len() as u64;
224        if content_size >= 5_368_709_120 {
225            return Err(Error::InvalidFileSize);
226        }
227        self.req.insert_header(header::CONTENT_LENGTH, content_size);
228        // Insert body
229        self.req.set_body(Full::new(Bytes::from(content)));
230        // Upload file
231        let response = self.req.send_to_oss()?.await?;
232        // Parse the response
233        let status_code = response.status();
234        match status_code {
235            code if code.is_success() => Ok(()),
236            _ => Err(normal_error(response).await),
237        }
238    }
239}