aliyun_oss_rs/object/
multipart_upload_part.rs

1use crate::{
2    error::{Error, normal_error},
3    request::{Oss, OssRequest},
4};
5use bytes::Bytes;
6use futures_util::StreamExt;
7use http::{Method, header};
8use http_body::Frame;
9use http_body_util::{Full, StreamBody};
10use tokio::{fs::File, io::BufReader};
11use tokio_util::io::ReaderStream;
12
13/// Initialize a multipart upload part
14///
15/// See the [Alibaba Cloud documentation](https://help.aliyun.com/document_detail/31993.html) for details
16pub struct UploadPart {
17    req: OssRequest,
18    callback: Option<Box<dyn Fn(u64, u64) + Send + Sync + 'static>>,
19}
20impl UploadPart {
21    pub(super) fn new(oss: Oss, part_number: u32, upload_id: impl ToString) -> Self {
22        let mut req = OssRequest::new(oss, Method::PUT);
23        req.insert_query("partNumber", part_number);
24        req.insert_query("uploadId", upload_id);
25        UploadPart {
26            req,
27            callback: None,
28        }
29    }
30    /// Set a callback for upload progress; this only applies to `send_file()`
31    /// ```
32    /// let callback = Box::new(|uploaded_size: u64, total_size: u64| {
33    ///     let percentage = if total_size == 0 {
34    ///         100.0
35    ///     } else {
36    ///         (uploaded_size as f64) / (total_size as f64) * 100.00
37    ///     };
38    ///     println!("{:.2}%", percentage);
39    /// });
40    /// ```
41    pub fn set_callback(mut self, callback: Box<dyn Fn(u64, u64) + Send + Sync + 'static>) -> Self {
42        self.callback = Some(callback);
43        self
44    }
45    /// Upload a file from disk to OSS
46    ///
47    /// Returns the ETag
48    pub async fn send_file(mut self, file: impl ToString) -> Result<String, Error> {
49        // Open the file
50        let file = File::open(file.to_string()).await?;
51        // Read the file size
52        let file_size = file.metadata().await?.len();
53        if file_size >= 5_368_709_120 || file_size < 102_400 {
54            return Err(Error::InvalidFileSize);
55        }
56        // Initialize the data stream for reading file content
57        let buf = BufReader::with_capacity(131072, file);
58        let stream = ReaderStream::with_capacity(buf, 16384);
59        // Initialize the uploaded content size
60        let mut uploaded_size = 0;
61        // Initialize upload request
62        let body = StreamBody::new(stream.map(move |result| match result {
63            Ok(chunk) => {
64                if let Some(callback) = &self.callback {
65                    let upload_size = chunk.len() as u64;
66                    uploaded_size += upload_size;
67                    callback(uploaded_size, file_size);
68                }
69                Ok(Frame::data(chunk))
70            }
71            Err(err) => Err(err),
72        }));
73        self.req.set_body(body);
74        // Upload file
75        let response = self.req.send_to_oss()?.await?;
76        // Parse the response
77        let status_code = response.status();
78        match status_code {
79            code if code.is_success() => {
80                let e_tag = response
81                    .headers()
82                    .get("ETag")
83                    .map(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
84                    .flatten()
85                    .unwrap_or_else(|| String::new());
86                Ok(e_tag)
87            }
88            _ => Err(normal_error(response).await),
89        }
90    }
91    /// Upload in-memory data to OSS
92    ///
93    /// Returns the ETag
94    pub async fn send_content(mut self, content: Vec<u8>) -> Result<String, Error> {
95        // Read size
96        let content_size = content.len() as u64;
97        if content_size >= 5_000_000_000 {
98            return Err(Error::InvalidFileSize);
99        }
100        self.req.insert_header(header::CONTENT_LENGTH, content_size);
101        // Insert body
102        self.req.set_body(Full::new(Bytes::from(content)));
103        // Upload file
104        let response = self.req.send_to_oss()?.await?;
105        // Parse the response
106        let status_code = response.status();
107        match status_code {
108            code if code.is_success() => {
109                let e_tag = response
110                    .headers()
111                    .get("ETag")
112                    .map(|v| String::from_utf8(v.as_bytes().to_vec()).ok())
113                    .flatten()
114                    .unwrap_or_else(|| String::new());
115                Ok(e_tag)
116            }
117            _ => Err(normal_error(response).await),
118        }
119    }
120}