aliyun_oss_rs/object/
append_object.rs

1use crate::{
2    common::{
3        Acl, CacheControl, ContentDisposition, StorageClass, invalid_metadata_key, url_encode,
4    },
5    error::{Error, normal_error},
6    request::{Oss, OssRequest},
7};
8use bytes::Bytes;
9use futures_util::StreamExt;
10use http::{Method, header};
11use http_body::Frame;
12use http_body_util::{Full, StreamBody};
13use std::collections::HashMap;
14use tokio::{fs::File, io::BufReader};
15use tokio_util::io::ReaderStream;
16
17/// Append to a file
18///
19/// Only files of type Appendable can be appended; files uploaded via the put method cannot be appended
20///
21/// The final size of the file after appending cannot exceed 5GB
22///
23/// The logic and limitations of append operations are complex; please read the [Alibaba Cloud documentation](https://help.aliyun.com/document_detail/31978.html)
24pub struct AppendObject {
25    req: OssRequest,
26    mime: Option<String>,
27    tags: HashMap<String, String>,
28    callback: Option<Box<dyn Fn(u64, u64) + Send + Sync + 'static>>,
29}
30
31impl AppendObject {
32    pub(super) fn new(oss: Oss) -> Self {
33        let mut req = OssRequest::new(oss, Method::POST);
34        req.insert_query("append", "");
35        req.insert_query("position", "0");
36        AppendObject {
37            req,
38            mime: None,
39            tags: HashMap::new(),
40            callback: None,
41        }
42    }
43    /// Set the starting position for the appended content
44    pub fn set_position(mut self, position: u32) -> Self {
45        self.req.insert_query("position", position);
46        self
47    }
48    /// Set the file's MIME type
49    ///
50    /// If no MIME type is set, the request will try to determine it from the content, local path, or remote path. If still unsuccessful, the default MIME type (application/octet-stream) is used
51    pub fn set_mime(mut self, mime: impl ToString) -> Self {
52        self.mime = Some(mime.to_string());
53        self
54    }
55    /// Set the file's access permissions
56    pub fn set_acl(mut self, acl: Acl) -> Self {
57        self.req.insert_header("x-oss-object-acl", acl);
58        self
59    }
60    /// Set the file's storage class
61    pub fn set_storage_class(mut self, storage_class: StorageClass) -> Self {
62        self.req.insert_header("x-oss-storage-class", storage_class);
63        self
64    }
65    /// Cache behavior of the webpage when the file is downloaded
66    pub fn set_cache_control(mut self, cache_control: CacheControl) -> Self {
67        self.req.insert_header(header::CACHE_CONTROL, cache_control);
68        self
69    }
70    /// Set how the file is presented
71    pub fn set_content_disposition(mut self, content_disposition: ContentDisposition) -> Self {
72        self.req
73            .insert_header(header::CONTENT_DISPOSITION, content_disposition);
74        self
75    }
76    /// Set additional metadata
77    ///
78    /// Keys may only contain letters, numbers, and hyphens; metadata with other characters will be discarded
79    pub fn set_meta(mut self, key: impl ToString, value: impl ToString) -> Self {
80        let key = key.to_string();
81        if !invalid_metadata_key(&key) {
82            self.req
83                .insert_header(format!("x-oss-meta-{}", key.to_string()), value);
84        }
85        self
86    }
87    /// Set tag information
88    pub fn set_tagging(mut self, key: impl ToString, value: impl ToString) -> Self {
89        self.tags.insert(key.to_string(), value.to_string());
90        self
91    }
92    /// Set a callback for upload progress; this only applies to `send_file()`
93    /// ```
94    /// let callback = Box::new(|uploaded_size: u64, total_size: u64| {
95    ///     let percentage = if total_size == 0 {
96    ///         100.0
97    ///     } else {
98    ///         (uploaded_size as f64) / (total_size as f64) * 100.00
99    ///     };
100    ///     println!("{:.2}%", percentage);
101    /// });
102    /// ```
103    pub fn set_callback(mut self, callback: Box<dyn Fn(u64, u64) + Send + Sync + 'static>) -> Self {
104        self.callback = Some(callback);
105        self
106    }
107    /// Upload a file from disk to OSS
108    ///
109    /// If a progress callback is set, the caller receives real-time updates
110    ///
111    pub async fn send_file(mut self, file: impl ToString) -> Result<Option<String>, Error> {
112        // Determine file MIME type
113        let file_type = match self.mime {
114            Some(mime) => mime,
115            None => match infer::get_from_path(&file.to_string())? {
116                Some(ext) => ext.mime_type().to_owned(),
117                None => mime_guess::from_path(
118                    &self
119                        .req
120                        .oss
121                        .object
122                        .clone()
123                        .map(|v| v.to_string())
124                        .unwrap_or_else(|| String::new()),
125                )
126                .first()
127                .map(|v| v.to_string())
128                .unwrap_or_else(|| "application/octet-stream".to_owned())
129                .to_string(),
130            },
131        };
132        self.req.insert_header(header::CONTENT_TYPE, file_type);
133        // Insert tags
134        let tags = self
135            .tags
136            .into_iter()
137            .map(|(key, value)| {
138                if value.is_empty() {
139                    url_encode(&key.to_string())
140                } else {
141                    format!(
142                        "{}={}",
143                        url_encode(&key.to_string()),
144                        url_encode(&value.to_string())
145                    )
146                }
147            })
148            .collect::<Vec<_>>()
149            .join("&");
150        if !tags.is_empty() {
151            self.req.insert_header("x-oss-tagging", tags);
152        }
153        // Open the file
154        let file = File::open(file.to_string()).await?;
155        // Read the file size
156        let file_size = file.metadata().await?.len();
157        if file_size >= 5_368_709_120 {
158            return Err(Error::InvalidFileSize);
159        }
160        self.req.insert_header(header::CONTENT_LENGTH, file_size);
161        // Initialize the data stream for reading file content
162        let buf = BufReader::with_capacity(131072, file);
163        let stream = ReaderStream::with_capacity(buf, 16384);
164        // Initialize the uploaded content size
165        let mut uploaded_size = 0;
166        // Create body object
167        let body = StreamBody::new(stream.map(move |result| match result {
168            Ok(chunk) => {
169                if let Some(callback) = &self.callback {
170                    let upload_size = chunk.len() as u64;
171                    uploaded_size += upload_size;
172                    callback(uploaded_size, file_size);
173                }
174                Ok(Frame::data(chunk))
175            }
176            Err(err) => Err(err),
177        }));
178        self.req.set_body(body);
179        // Build the HTTP request
180        let response = self.req.send_to_oss()?.await?;
181        // Parse the response
182        let status_code = response.status();
183        match status_code {
184            code if code.is_success() => {
185                let next_position = response
186                    .headers()
187                    .get("x-oss-next-append-position")
188                    .and_then(|header| header.to_str().ok().map(|s| s.to_owned()));
189                Ok(next_position)
190            }
191            _ => Err(normal_error(response).await),
192        }
193    }
194    /// Upload in-memory data to OSS
195    ///
196    pub async fn send_content(mut self, content: Vec<u8>) -> Result<Option<String>, Error> {
197        // Read the file size
198        let content_size = content.len();
199        if content_size >= 5_368_709_120 {
200            return Err(Error::InvalidFileSize);
201        }
202        self.req.insert_header(header::CONTENT_LENGTH, content_size);
203        // Determine file MIME type
204        let content_type = match self.mime {
205            Some(mime) => mime,
206            None => match infer::get(&content) {
207                Some(ext) => ext.mime_type().to_string(),
208                None => mime_guess::from_path(
209                    self.req
210                        .oss
211                        .object
212                        .clone()
213                        .map(|v| v.to_string())
214                        .unwrap_or_else(|| String::new().into()),
215                )
216                .first()
217                .map(|v| v.to_string())
218                .unwrap_or_else(|| "application/octet-stream".to_owned())
219                .to_string(),
220            },
221        };
222        self.req.insert_header(header::CONTENT_TYPE, content_type);
223        // Insert tags
224        let tags = self
225            .tags
226            .into_iter()
227            .map(|(key, value)| {
228                if value.is_empty() {
229                    url_encode(&key.to_string())
230                } else {
231                    format!(
232                        "{}={}",
233                        url_encode(&key.to_string()),
234                        url_encode(&value.to_string())
235                    )
236                }
237            })
238            .collect::<Vec<_>>()
239            .join("&");
240        if !tags.is_empty() {
241            self.req.insert_header("x-oss-tagging", tags);
242        }
243        // Insert body
244        self.req.set_body(Full::new(Bytes::from(content)));
245        // Build the HTTP request
246        let response = self.req.send_to_oss()?.await?;
247        // Parse the response
248        let status_code = response.status();
249        match status_code {
250            code if code.is_success() => {
251                let next_position = response
252                    .headers()
253                    .get("x-oss-next-append-position")
254                    .and_then(|header| header.to_str().ok().map(|s| s.to_owned()));
255                Ok(next_position)
256            }
257            _ => Err(normal_error(response).await),
258        }
259    }
260}