s3/bucket/
put.rs

1use crate::bucket::CorsConfiguration;
2use crate::bucket::{
3    error_from_response_data, Bucket, CompleteMultipartUploadData, InitiateMultipartUploadResponse,
4    Part, Read, Request, CHUNK_SIZE,
5};
6use crate::command::{Command, Multipart};
7use crate::error::S3Error;
8use crate::request::{RequestImpl, ResponseData};
9
10use crate::bucket::PutStreamResponse;
11use crate::request::AsyncRead;
12
13impl Bucket {
14    pub async fn put_bucket_cors(
15        &self,
16        cors_config: CorsConfiguration,
17    ) -> Result<ResponseData, S3Error> {
18        let command = Command::PutBucketCors {
19            configuration: cors_config,
20        };
21        let request = RequestImpl::new(self, "?cors", command)?;
22        request.response_data(false).await
23    }
24
25    /// Stream file from local path to s3, generic over T: Write.
26    ///
27    /// # Example:
28    ///
29    /// ```rust,no_run
30    /// use s3::bucket::Bucket;
31    /// use s3::creds::Credentials;
32    /// use anyhow::Result;
33    /// use std::fs::File;
34    /// use std::io::Write;
35    ///
36    /// # #[tokio::main]
37    /// # async fn main() -> Result<()> {
38    ///
39    /// let bucket_name = "rust-s3-test";
40    /// let region = "us-east-1".parse()?;
41    /// let credentials = Credentials::default()?;
42    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
43    /// let path = "path";
44    /// let test: Vec<u8> = (0..1000).map(|_| 42).collect();
45    /// let mut file = File::create(path)?;
46    /// // tokio open file
47    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
48    /// file.write_all(&test)?;
49    ///
50    /// let status_code = bucket.put_object_stream(&mut async_output_file, "/path").await?;
51    ///
52    /// #
53    /// # Ok(())
54    /// # }
55    /// ```
56    pub async fn put_object_stream<R: AsyncRead + Unpin>(
57        &self,
58        reader: &mut R,
59        s3_path: impl AsRef<str>,
60    ) -> Result<PutStreamResponse, S3Error> {
61        self._put_object_stream_with_content_type(
62            reader,
63            s3_path.as_ref(),
64            "application/octet-stream",
65        )
66        .await
67    }
68
69    /// Stream file from local path to s3, generic over T: Write with explicit content type.
70    ///
71    /// # Example:
72    ///
73    /// ```rust,no_run
74    /// use s3::bucket::Bucket;
75    /// use s3::creds::Credentials;
76    /// use anyhow::Result;
77    /// use std::fs::File;
78    /// use std::io::Write;
79    ///
80    /// # #[tokio::main]
81    /// # async fn main() -> Result<()> {
82    ///
83    /// let bucket_name = "rust-s3-test";
84    /// let region = "us-east-1".parse()?;
85    /// let credentials = Credentials::default()?;
86    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
87    /// let path = "path";
88    /// let test: Vec<u8> = (0..1000).map(|_| 42).collect();
89    /// let mut file = File::create(path)?;
90    /// file.write_all(&test)?;
91    ///
92    /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
93    ///
94    /// let status_code = bucket
95    ///     .put_object_stream_with_content_type(&mut async_output_file, "/path", "application/octet-stream")
96    ///     .await?;
97    ///
98    /// # Ok(())
99    /// # }
100    /// ```
101    pub async fn put_object_stream_with_content_type<R: AsyncRead + Unpin>(
102        &self,
103        reader: &mut R,
104        s3_path: impl AsRef<str>,
105        content_type: impl AsRef<str>,
106    ) -> Result<PutStreamResponse, S3Error> {
107        self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
108            .await
109    }
110
111    async fn make_multipart_request(
112        &self,
113        path: &str,
114        chunk: Vec<u8>,
115        part_number: u32,
116        upload_id: &str,
117        content_type: &str,
118    ) -> Result<ResponseData, S3Error> {
119        let command = Command::PutObject {
120            content: &chunk,
121            multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
122            content_type,
123        };
124        let request = RequestImpl::new(self, path, command)?;
125        request.response_data(true).await
126    }
127
128    async fn _put_object_stream_with_content_type<R: AsyncRead + Unpin>(
129        &self,
130        reader: &mut R,
131        s3_path: &str,
132        content_type: &str,
133    ) -> Result<PutStreamResponse, S3Error> {
134        // If the file is smaller CHUNK_SIZE, just do a regular upload.
135        // Otherwise perform a multi-part upload.
136        let first_chunk = crate::utils::read_chunk_async(reader).await?;
137        if first_chunk.len() < CHUNK_SIZE {
138            let total_size = first_chunk.len();
139            let response_data = self
140                .put_object_with_content_type(s3_path, first_chunk.as_slice(), content_type)
141                .await?;
142            if response_data.status_code() >= 300 {
143                return Err(error_from_response_data(response_data)?);
144            }
145            return Ok(PutStreamResponse::new(
146                response_data.status_code(),
147                total_size,
148            ));
149        }
150
151        let msg = self
152            .initiate_multipart_upload(s3_path, content_type)
153            .await?;
154        let path = msg.key;
155        let upload_id = &msg.upload_id;
156
157        let mut part_number: u32 = 0;
158        let mut etags = Vec::new();
159
160        // Collect request handles
161        let mut handles = vec![];
162        let mut total_size = 0;
163        loop {
164            let chunk = if part_number == 0 {
165                first_chunk.clone()
166            } else {
167                crate::utils::read_chunk_async(reader).await?
168            };
169            total_size += chunk.len();
170
171            let done = chunk.len() < CHUNK_SIZE;
172
173            // Start chunk upload
174            part_number += 1;
175            handles.push(self.make_multipart_request(
176                &path,
177                chunk,
178                part_number,
179                upload_id,
180                content_type,
181            ));
182
183            if done {
184                break;
185            }
186        }
187
188        // Wait for all chunks to finish (or fail)
189        let responses = futures::future::join_all(handles).await;
190
191        for response in responses {
192            let response_data = response?;
193            if !(200..300).contains(&response_data.status_code()) {
194                // if chunk upload failed - abort the upload
195                match self.abort_upload(&path, upload_id).await {
196                    Ok(_) => {
197                        return Err(error_from_response_data(response_data)?);
198                    }
199                    Err(error) => {
200                        return Err(error);
201                    }
202                }
203            }
204
205            let etag = response_data.as_str()?;
206            etags.push(etag.to_string());
207        }
208
209        // Finish the upload
210        let inner_data = etags
211            .clone()
212            .into_iter()
213            .enumerate()
214            .map(|(i, x)| Part {
215                etag: x,
216                part_number: i as u32 + 1,
217            })
218            .collect::<Vec<Part>>();
219        let response_data = self
220            .complete_multipart_upload(&path, &msg.upload_id, inner_data)
221            .await?;
222
223        Ok(PutStreamResponse::new(
224            response_data.status_code(),
225            total_size,
226        ))
227    }
228
229    /// Initiate multipart upload to s3.
230    pub async fn initiate_multipart_upload(
231        &self,
232        s3_path: &str,
233        content_type: &str,
234    ) -> Result<InitiateMultipartUploadResponse, S3Error> {
235        let command = Command::InitiateMultipartUpload { content_type };
236        let request = RequestImpl::new(self, s3_path, command)?;
237        let response_data = request.response_data(false).await?;
238        if response_data.status_code() >= 300 {
239            return Err(error_from_response_data(response_data)?);
240        }
241
242        let msg: InitiateMultipartUploadResponse =
243            quick_xml::de::from_str(response_data.as_str()?)?;
244        Ok(msg)
245    }
246
247    /// Upload a streamed multipart chunk to s3 using a previously initiated multipart upload
248    pub async fn put_multipart_stream<R: Read + Unpin>(
249        &self,
250        reader: &mut R,
251        path: &str,
252        part_number: u32,
253        upload_id: &str,
254        content_type: &str,
255    ) -> Result<Part, S3Error> {
256        let chunk = crate::utils::read_chunk(reader)?;
257        self.put_multipart_chunk(chunk, path, part_number, upload_id, content_type)
258            .await
259    }
260
261    /// Upload a buffered multipart chunk to s3 using a previously initiated multipart upload
262    pub async fn put_multipart_chunk(
263        &self,
264        chunk: Vec<u8>,
265        path: &str,
266        part_number: u32,
267        upload_id: &str,
268        content_type: &str,
269    ) -> Result<Part, S3Error> {
270        let command = Command::PutObject {
271            // part_number,
272            content: &chunk,
273            multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
274            content_type,
275        };
276        let request = RequestImpl::new(self, path, command)?;
277        let response_data = request.response_data(true).await?;
278        if !(200..300).contains(&response_data.status_code()) {
279            // if chunk upload failed - abort the upload
280            match self.abort_upload(path, upload_id).await {
281                Ok(_) => {
282                    return Err(error_from_response_data(response_data)?);
283                }
284                Err(error) => {
285                    return Err(error);
286                }
287            }
288        }
289        let etag = response_data.as_str()?;
290        Ok(Part {
291            etag: etag.to_string(),
292            part_number,
293        })
294    }
295
296    /// Completes a previously initiated multipart upload, with optional final data chunks
297    pub async fn complete_multipart_upload(
298        &self,
299        path: &str,
300        upload_id: &str,
301        parts: Vec<Part>,
302    ) -> Result<ResponseData, S3Error> {
303        let data = CompleteMultipartUploadData { parts };
304        let complete = Command::CompleteMultipartUpload { upload_id, data };
305        let complete_request = RequestImpl::new(self, path, complete)?;
306        complete_request.response_data(false).await
307    }
308
309    /// Put into an S3 bucket, with explicit content-type.
310    ///
311    /// # Example:
312    ///
313    /// ```no_run
314    /// use s3::bucket::Bucket;
315    /// use s3::creds::Credentials;
316    /// use anyhow::Result;
317    ///
318    /// # #[tokio::main]
319    /// # async fn main() -> Result<()> {
320    ///
321    /// let bucket_name = "rust-s3-test";
322    /// let region = "us-east-1".parse()?;
323    /// let credentials = Credentials::default()?;
324    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
325    /// let content = "I want to go to S3".as_bytes();
326    ///
327    /// let response_data = bucket.put_object_with_content_type("/test.file", content, "text/plain").await?;
328    /// #
329    /// # Ok(())
330    /// # }
331    /// ```
332    pub async fn put_object_with_content_type<S: AsRef<str>>(
333        &self,
334        path: S,
335        content: &[u8],
336        content_type: &str,
337    ) -> Result<ResponseData, S3Error> {
338        let command = Command::PutObject {
339            content,
340            content_type,
341            multipart: None,
342        };
343        let request = RequestImpl::new(self, path.as_ref(), command)?;
344        request.response_data(true).await
345    }
346
347    /// Put into an S3 bucket.
348    ///
349    /// # Example:
350    ///
351    /// ```no_run
352    /// use s3::bucket::Bucket;
353    /// use s3::creds::Credentials;
354    /// use anyhow::Result;
355    ///
356    /// # #[tokio::main]
357    /// # async fn main() -> Result<()> {
358    ///
359    /// let bucket_name = "rust-s3-test";
360    /// let region = "us-east-1".parse()?;
361    /// let credentials = Credentials::default()?;
362    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
363    /// let content = "I want to go to S3".as_bytes();
364    ///
365    /// let response_data = bucket.put_object("/test.file", content).await?;
366    /// #
367    /// # Ok(())
368    /// # }
369    /// ```
370    pub async fn put_object<S: AsRef<str>>(
371        &self,
372        path: S,
373        content: &[u8],
374    ) -> Result<ResponseData, S3Error> {
375        self.put_object_with_content_type(path, content, "application/octet-stream")
376            .await
377    }
378
379    /// Tag an S3 object.
380    ///
381    /// # Example:
382    ///
383    /// ```no_run
384    /// use s3::bucket::Bucket;
385    /// use s3::creds::Credentials;
386    /// use anyhow::Result;
387    ///
388    /// # #[tokio::main]
389    /// # async fn main() -> Result<()> {
390    ///
391    /// let bucket_name = "rust-s3-test";
392    /// let region = "us-east-1".parse()?;
393    /// let credentials = Credentials::default()?;
394    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
395    ///
396    /// let response_data = bucket.put_object_tagging("/test.file", &[("Tag1", "Value1"), ("Tag2", "Value2")]).await?;
397    ///
398    /// #
399    /// # Ok(())
400    /// # }
401    /// ```
402    pub async fn put_object_tagging<S: AsRef<str>>(
403        &self,
404        path: &str,
405        tags: &[(S, S)],
406    ) -> Result<ResponseData, S3Error> {
407        let content = self._tags_xml(tags);
408        let command = Command::PutObjectTagging { tags: &content };
409        let request = RequestImpl::new(self, path, command)?;
410        request.response_data(false).await
411    }
412
413    /// Abort a running multipart upload.
414    ///
415    /// # Example:
416    ///
417    /// ```no_run
418    /// use s3::bucket::Bucket;
419    /// use s3::creds::Credentials;
420    /// use anyhow::Result;
421    ///
422    /// # #[tokio::main]
423    /// # async fn main() -> Result<()> {
424    ///
425    /// let bucket_name = "rust-s3-test";
426    /// let region = "us-east-1".parse()?;
427    /// let credentials = Credentials::default()?;
428    /// let bucket = Bucket::new(bucket_name, region, credentials)?;
429    ///
430    /// let results = bucket.abort_upload("/some/file.txt", "ZDFjM2I0YmEtMzU3ZC00OTQ1LTlkNGUtMTgxZThjYzIwNjA2").await?;
431    ///
432    /// #
433    /// # Ok(())
434    /// # }
435    /// ```
436    pub async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
437        let abort = Command::AbortMultipartUpload { upload_id };
438        let abort_request = RequestImpl::new(self, key, abort)?;
439        let response_data = abort_request.response_data(false).await?;
440
441        if (200..300).contains(&response_data.status_code()) {
442            Ok(())
443        } else {
444            let utf8_content = String::from_utf8(response_data.as_slice().to_vec())?;
445            Err(S3Error::HttpFailWithBody(
446                response_data.status_code(),
447                utf8_content,
448            ))
449        }
450    }
451}