s3/bucket/put.rs
1use crate::bucket::CorsConfiguration;
2use crate::bucket::{
3 error_from_response_data, Bucket, CompleteMultipartUploadData, InitiateMultipartUploadResponse,
4 Part, Read, Request, CHUNK_SIZE,
5};
6use crate::command::{Command, Multipart};
7use crate::error::S3Error;
8use crate::request::{RequestImpl, ResponseData};
9
10use crate::bucket::PutStreamResponse;
11use crate::request::AsyncRead;
12
13impl Bucket {
14 pub async fn put_bucket_cors(
15 &self,
16 cors_config: CorsConfiguration,
17 ) -> Result<ResponseData, S3Error> {
18 let command = Command::PutBucketCors {
19 configuration: cors_config,
20 };
21 let request = RequestImpl::new(self, "?cors", command)?;
22 request.response_data(false).await
23 }
24
25 /// Stream file from local path to s3, generic over T: Write.
26 ///
27 /// # Example:
28 ///
29 /// ```rust,no_run
30 /// use s3::bucket::Bucket;
31 /// use s3::creds::Credentials;
32 /// use anyhow::Result;
33 /// use std::fs::File;
34 /// use std::io::Write;
35 ///
36 /// # #[tokio::main]
37 /// # async fn main() -> Result<()> {
38 ///
39 /// let bucket_name = "rust-s3-test";
40 /// let region = "us-east-1".parse()?;
41 /// let credentials = Credentials::default()?;
42 /// let bucket = Bucket::new(bucket_name, region, credentials)?;
43 /// let path = "path";
44 /// let test: Vec<u8> = (0..1000).map(|_| 42).collect();
45 /// let mut file = File::create(path)?;
46 /// // tokio open file
47 /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
48 /// file.write_all(&test)?;
49 ///
50 /// let status_code = bucket.put_object_stream(&mut async_output_file, "/path").await?;
51 ///
52 /// #
53 /// # Ok(())
54 /// # }
55 /// ```
56 pub async fn put_object_stream<R: AsyncRead + Unpin>(
57 &self,
58 reader: &mut R,
59 s3_path: impl AsRef<str>,
60 ) -> Result<PutStreamResponse, S3Error> {
61 self._put_object_stream_with_content_type(
62 reader,
63 s3_path.as_ref(),
64 "application/octet-stream",
65 )
66 .await
67 }
68
69 /// Stream file from local path to s3, generic over T: Write with explicit content type.
70 ///
71 /// # Example:
72 ///
73 /// ```rust,no_run
74 /// use s3::bucket::Bucket;
75 /// use s3::creds::Credentials;
76 /// use anyhow::Result;
77 /// use std::fs::File;
78 /// use std::io::Write;
79 ///
80 /// # #[tokio::main]
81 /// # async fn main() -> Result<()> {
82 ///
83 /// let bucket_name = "rust-s3-test";
84 /// let region = "us-east-1".parse()?;
85 /// let credentials = Credentials::default()?;
86 /// let bucket = Bucket::new(bucket_name, region, credentials)?;
87 /// let path = "path";
88 /// let test: Vec<u8> = (0..1000).map(|_| 42).collect();
89 /// let mut file = File::create(path)?;
90 /// file.write_all(&test)?;
91 ///
92 /// let mut async_output_file = tokio::fs::File::create("async_output_file").await.expect("Unable to create file");
93 ///
94 /// let status_code = bucket
95 /// .put_object_stream_with_content_type(&mut async_output_file, "/path", "application/octet-stream")
96 /// .await?;
97 ///
98 /// # Ok(())
99 /// # }
100 /// ```
101 pub async fn put_object_stream_with_content_type<R: AsyncRead + Unpin>(
102 &self,
103 reader: &mut R,
104 s3_path: impl AsRef<str>,
105 content_type: impl AsRef<str>,
106 ) -> Result<PutStreamResponse, S3Error> {
107 self._put_object_stream_with_content_type(reader, s3_path.as_ref(), content_type.as_ref())
108 .await
109 }
110
111 async fn make_multipart_request(
112 &self,
113 path: &str,
114 chunk: Vec<u8>,
115 part_number: u32,
116 upload_id: &str,
117 content_type: &str,
118 ) -> Result<ResponseData, S3Error> {
119 let command = Command::PutObject {
120 content: &chunk,
121 multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
122 content_type,
123 };
124 let request = RequestImpl::new(self, path, command)?;
125 request.response_data(true).await
126 }
127
128 async fn _put_object_stream_with_content_type<R: AsyncRead + Unpin>(
129 &self,
130 reader: &mut R,
131 s3_path: &str,
132 content_type: &str,
133 ) -> Result<PutStreamResponse, S3Error> {
134 // If the file is smaller CHUNK_SIZE, just do a regular upload.
135 // Otherwise perform a multi-part upload.
136 let first_chunk = crate::utils::read_chunk_async(reader).await?;
137 if first_chunk.len() < CHUNK_SIZE {
138 let total_size = first_chunk.len();
139 let response_data = self
140 .put_object_with_content_type(s3_path, first_chunk.as_slice(), content_type)
141 .await?;
142 if response_data.status_code() >= 300 {
143 return Err(error_from_response_data(response_data)?);
144 }
145 return Ok(PutStreamResponse::new(
146 response_data.status_code(),
147 total_size,
148 ));
149 }
150
151 let msg = self
152 .initiate_multipart_upload(s3_path, content_type)
153 .await?;
154 let path = msg.key;
155 let upload_id = &msg.upload_id;
156
157 let mut part_number: u32 = 0;
158 let mut etags = Vec::new();
159
160 // Collect request handles
161 let mut handles = vec![];
162 let mut total_size = 0;
163 loop {
164 let chunk = if part_number == 0 {
165 first_chunk.clone()
166 } else {
167 crate::utils::read_chunk_async(reader).await?
168 };
169 total_size += chunk.len();
170
171 let done = chunk.len() < CHUNK_SIZE;
172
173 // Start chunk upload
174 part_number += 1;
175 handles.push(self.make_multipart_request(
176 &path,
177 chunk,
178 part_number,
179 upload_id,
180 content_type,
181 ));
182
183 if done {
184 break;
185 }
186 }
187
188 // Wait for all chunks to finish (or fail)
189 let responses = futures::future::join_all(handles).await;
190
191 for response in responses {
192 let response_data = response?;
193 if !(200..300).contains(&response_data.status_code()) {
194 // if chunk upload failed - abort the upload
195 match self.abort_upload(&path, upload_id).await {
196 Ok(_) => {
197 return Err(error_from_response_data(response_data)?);
198 }
199 Err(error) => {
200 return Err(error);
201 }
202 }
203 }
204
205 let etag = response_data.as_str()?;
206 etags.push(etag.to_string());
207 }
208
209 // Finish the upload
210 let inner_data = etags
211 .clone()
212 .into_iter()
213 .enumerate()
214 .map(|(i, x)| Part {
215 etag: x,
216 part_number: i as u32 + 1,
217 })
218 .collect::<Vec<Part>>();
219 let response_data = self
220 .complete_multipart_upload(&path, &msg.upload_id, inner_data)
221 .await?;
222
223 Ok(PutStreamResponse::new(
224 response_data.status_code(),
225 total_size,
226 ))
227 }
228
229 /// Initiate multipart upload to s3.
230 pub async fn initiate_multipart_upload(
231 &self,
232 s3_path: &str,
233 content_type: &str,
234 ) -> Result<InitiateMultipartUploadResponse, S3Error> {
235 let command = Command::InitiateMultipartUpload { content_type };
236 let request = RequestImpl::new(self, s3_path, command)?;
237 let response_data = request.response_data(false).await?;
238 if response_data.status_code() >= 300 {
239 return Err(error_from_response_data(response_data)?);
240 }
241
242 let msg: InitiateMultipartUploadResponse =
243 quick_xml::de::from_str(response_data.as_str()?)?;
244 Ok(msg)
245 }
246
247 /// Upload a streamed multipart chunk to s3 using a previously initiated multipart upload
248 pub async fn put_multipart_stream<R: Read + Unpin>(
249 &self,
250 reader: &mut R,
251 path: &str,
252 part_number: u32,
253 upload_id: &str,
254 content_type: &str,
255 ) -> Result<Part, S3Error> {
256 let chunk = crate::utils::read_chunk(reader)?;
257 self.put_multipart_chunk(chunk, path, part_number, upload_id, content_type)
258 .await
259 }
260
261 /// Upload a buffered multipart chunk to s3 using a previously initiated multipart upload
262 pub async fn put_multipart_chunk(
263 &self,
264 chunk: Vec<u8>,
265 path: &str,
266 part_number: u32,
267 upload_id: &str,
268 content_type: &str,
269 ) -> Result<Part, S3Error> {
270 let command = Command::PutObject {
271 // part_number,
272 content: &chunk,
273 multipart: Some(Multipart::new(part_number, upload_id)), // upload_id: &msg.upload_id,
274 content_type,
275 };
276 let request = RequestImpl::new(self, path, command)?;
277 let response_data = request.response_data(true).await?;
278 if !(200..300).contains(&response_data.status_code()) {
279 // if chunk upload failed - abort the upload
280 match self.abort_upload(path, upload_id).await {
281 Ok(_) => {
282 return Err(error_from_response_data(response_data)?);
283 }
284 Err(error) => {
285 return Err(error);
286 }
287 }
288 }
289 let etag = response_data.as_str()?;
290 Ok(Part {
291 etag: etag.to_string(),
292 part_number,
293 })
294 }
295
296 /// Completes a previously initiated multipart upload, with optional final data chunks
297 pub async fn complete_multipart_upload(
298 &self,
299 path: &str,
300 upload_id: &str,
301 parts: Vec<Part>,
302 ) -> Result<ResponseData, S3Error> {
303 let data = CompleteMultipartUploadData { parts };
304 let complete = Command::CompleteMultipartUpload { upload_id, data };
305 let complete_request = RequestImpl::new(self, path, complete)?;
306 complete_request.response_data(false).await
307 }
308
309 /// Put into an S3 bucket, with explicit content-type.
310 ///
311 /// # Example:
312 ///
313 /// ```no_run
314 /// use s3::bucket::Bucket;
315 /// use s3::creds::Credentials;
316 /// use anyhow::Result;
317 ///
318 /// # #[tokio::main]
319 /// # async fn main() -> Result<()> {
320 ///
321 /// let bucket_name = "rust-s3-test";
322 /// let region = "us-east-1".parse()?;
323 /// let credentials = Credentials::default()?;
324 /// let bucket = Bucket::new(bucket_name, region, credentials)?;
325 /// let content = "I want to go to S3".as_bytes();
326 ///
327 /// let response_data = bucket.put_object_with_content_type("/test.file", content, "text/plain").await?;
328 /// #
329 /// # Ok(())
330 /// # }
331 /// ```
332 pub async fn put_object_with_content_type<S: AsRef<str>>(
333 &self,
334 path: S,
335 content: &[u8],
336 content_type: &str,
337 ) -> Result<ResponseData, S3Error> {
338 let command = Command::PutObject {
339 content,
340 content_type,
341 multipart: None,
342 };
343 let request = RequestImpl::new(self, path.as_ref(), command)?;
344 request.response_data(true).await
345 }
346
347 /// Put into an S3 bucket.
348 ///
349 /// # Example:
350 ///
351 /// ```no_run
352 /// use s3::bucket::Bucket;
353 /// use s3::creds::Credentials;
354 /// use anyhow::Result;
355 ///
356 /// # #[tokio::main]
357 /// # async fn main() -> Result<()> {
358 ///
359 /// let bucket_name = "rust-s3-test";
360 /// let region = "us-east-1".parse()?;
361 /// let credentials = Credentials::default()?;
362 /// let bucket = Bucket::new(bucket_name, region, credentials)?;
363 /// let content = "I want to go to S3".as_bytes();
364 ///
365 /// let response_data = bucket.put_object("/test.file", content).await?;
366 /// #
367 /// # Ok(())
368 /// # }
369 /// ```
370 pub async fn put_object<S: AsRef<str>>(
371 &self,
372 path: S,
373 content: &[u8],
374 ) -> Result<ResponseData, S3Error> {
375 self.put_object_with_content_type(path, content, "application/octet-stream")
376 .await
377 }
378
379 /// Tag an S3 object.
380 ///
381 /// # Example:
382 ///
383 /// ```no_run
384 /// use s3::bucket::Bucket;
385 /// use s3::creds::Credentials;
386 /// use anyhow::Result;
387 ///
388 /// # #[tokio::main]
389 /// # async fn main() -> Result<()> {
390 ///
391 /// let bucket_name = "rust-s3-test";
392 /// let region = "us-east-1".parse()?;
393 /// let credentials = Credentials::default()?;
394 /// let bucket = Bucket::new(bucket_name, region, credentials)?;
395 ///
396 /// let response_data = bucket.put_object_tagging("/test.file", &[("Tag1", "Value1"), ("Tag2", "Value2")]).await?;
397 ///
398 /// #
399 /// # Ok(())
400 /// # }
401 /// ```
402 pub async fn put_object_tagging<S: AsRef<str>>(
403 &self,
404 path: &str,
405 tags: &[(S, S)],
406 ) -> Result<ResponseData, S3Error> {
407 let content = self._tags_xml(tags);
408 let command = Command::PutObjectTagging { tags: &content };
409 let request = RequestImpl::new(self, path, command)?;
410 request.response_data(false).await
411 }
412
413 /// Abort a running multipart upload.
414 ///
415 /// # Example:
416 ///
417 /// ```no_run
418 /// use s3::bucket::Bucket;
419 /// use s3::creds::Credentials;
420 /// use anyhow::Result;
421 ///
422 /// # #[tokio::main]
423 /// # async fn main() -> Result<()> {
424 ///
425 /// let bucket_name = "rust-s3-test";
426 /// let region = "us-east-1".parse()?;
427 /// let credentials = Credentials::default()?;
428 /// let bucket = Bucket::new(bucket_name, region, credentials)?;
429 ///
430 /// let results = bucket.abort_upload("/some/file.txt", "ZDFjM2I0YmEtMzU3ZC00OTQ1LTlkNGUtMTgxZThjYzIwNjA2").await?;
431 ///
432 /// #
433 /// # Ok(())
434 /// # }
435 /// ```
436 pub async fn abort_upload(&self, key: &str, upload_id: &str) -> Result<(), S3Error> {
437 let abort = Command::AbortMultipartUpload { upload_id };
438 let abort_request = RequestImpl::new(self, key, abort)?;
439 let response_data = abort_request.response_data(false).await?;
440
441 if (200..300).contains(&response_data.status_code()) {
442 Ok(())
443 } else {
444 let utf8_content = String::from_utf8(response_data.as_slice().to_vec())?;
445 Err(S3Error::HttpFailWithBody(
446 response_data.status_code(),
447 utf8_content,
448 ))
449 }
450 }
451}