cobalt_aws/s3/
async_multipart_put_object.rs

1//! Provides ways of interacting with objects in S3.
2
3// Standard library imports
4use std::mem;
5use std::pin::Pin;
6
7// External crates
8use anyhow::Context as _;
9use aws_sdk_s3::primitives::ByteStream;
10use aws_sdk_s3::{
11    operation::{
12        complete_multipart_upload::{CompleteMultipartUploadError, CompleteMultipartUploadOutput},
13        upload_part::{UploadPartError, UploadPartOutput},
14    },
15    types::{CompletedMultipartUpload, CompletedPart, ObjectCannedAcl, Part},
16    Client,
17};
18use bytesize::{GIB, MIB};
19use derive_more::Debug;
20use futures::{
21    future::BoxFuture,
22    io::Error,
23    task::{Context, Poll},
24    AsyncWrite, Future, FutureExt, TryFutureExt,
25};
26use tracing::{event, instrument, Level};
27
28// Internal project imports
29use crate::s3::S3Object;
30use crate::types::SdkError;
31
32/// Convenience wrapper for boxed future
33type MultipartUploadFuture<'a> =
34    BoxFuture<'a, Result<(UploadPartOutput, i32), SdkError<UploadPartError>>>;
35/// Convenience wrapper for boxed future
36type CompleteMultipartUploadFuture<'a> =
37    BoxFuture<'a, Result<CompleteMultipartUploadOutput, SdkError<CompleteMultipartUploadError>>>;
38
39/// Holds state for the [AsyncMultipartUpload]
40#[derive(Debug)]
41enum AsyncMultipartUploadState<'a> {
42    /// Use this in mem::swap to avoid split borrows
43    None,
44    ///Bytes are being written
45    Writing {
46        /// Multipart Uploads that are running
47        #[debug(skip)]
48        uploads: Vec<MultipartUploadFuture<'a>>,
49        /// Bytes waiting to be written.
50        buffer: Vec<u8>,
51        /// The next part number to be used.
52        part_number: i32,
53        /// The completed parts
54        completed_parts: Vec<CompletedPart>,
55    },
56    /// close() has been called and parts are still uploading.
57    CompletingParts {
58        #[debug(skip)]
59        uploads: Vec<MultipartUploadFuture<'a>>,
60        completed_parts: Vec<CompletedPart>,
61    },
62    /// All parts have been uploaded and the CompleteMultipart is returning.
63    Completing(#[debug(skip)] CompleteMultipartUploadFuture<'a>),
64    // We have completed writing to S3.
65    Closed,
66}
67
68/// A implementation of [AsyncWrite] for S3 objects using multipart uploads.
69/// By using multipart uploads constant memory usage can be achieved.
70///
71/// ## Note
72/// On failure the multipart upload is not aborted. It is up to the
73/// caller to call the S3 `abortMultipartUpload` API when required.
74///
75/// ```no_run
76/// use cobalt_aws::s3::{AsyncMultipartUpload, Client, S3Object};
77/// use cobalt_aws::config::load_from_env;
78/// use futures::AsyncWriteExt;
79///
80/// # tokio_test::block_on(async {
81/// let shared_config = load_from_env().await.unwrap();
82/// let client = Client::new(&shared_config);
83/// let dst = S3Object::new("my-bucket", "my-key");
84/// let part_size = 5 * 1_048_576;
85/// let mut writer = AsyncMultipartUpload::new(&client, &dst, part_size, None).await.unwrap();
86/// let buffer_len = 6 * 1_048_576;
87/// // Each part is uploaded as it's available
88/// writer.write_all(&vec![0; buffer_len]).await.unwrap();
89///
90/// // The pending parts are uploaded and the multipart upload is completed
91/// // on close.
92/// writer.close().await.unwrap();
93/// # })
94/// ```
95
96#[derive(Debug)]
97pub struct AsyncMultipartUpload<'a> {
98    client: &'a Client,
99    dst: S3Object,
100    upload_id: String,
101    part_size: usize,
102    max_uploading_parts: usize,
103    state: AsyncMultipartUploadState<'a>,
104}
105
106/// Minimum size of a multipart upload.
107const MIN_PART_SIZE: usize = 5_usize * MIB as usize; // 5 Mib
108/// Maximum size of a multipart upload.
109const MAX_PART_SIZE: usize = 5_usize * GIB as usize; // 5 Gib
110
111/// Default number of part which can be uploaded
112/// concurrently.
113const DEFAULT_MAX_UPLOADING_PARTS: usize = 100;
114
115impl<'a> AsyncMultipartUpload<'a> {
116    /// Create a new [AsyncMultipartUpload].
117    ///
118    /// * `client`              - S3 client to use.
119    /// * `dst`                 - The [S3Object] to write the object into.
120    /// * `part_size`           - How large, in bytes, each part should be. Must be
121    /// larger than 5MIB and smaller that 5GIB.
122    /// * `max_uploading_parts` - How many parts to upload concurrently,
123    /// Must be larger than 0 (defaults to 100).
124    #[instrument(skip(client))]
125    pub async fn new(
126        client: &'a Client,
127        dst: &S3Object,
128        part_size: usize,
129        max_uploading_parts: Option<usize>,
130    ) -> anyhow::Result<AsyncMultipartUpload<'a>> {
131        event!(Level::DEBUG, "New AsyncMultipartUpload");
132        if part_size < MIN_PART_SIZE {
133            anyhow::bail!("part_size was {part_size}, can not be less than {MIN_PART_SIZE}")
134        }
135        if part_size > MAX_PART_SIZE {
136            anyhow::bail!("part_size was {part_size}, can not be more than {MAX_PART_SIZE}")
137        }
138
139        //Check that user did not send in invalid parameter
140        if let Some(0) = max_uploading_parts {
141            anyhow::bail!("Max uploading parts must not be 0")
142        }
143
144        let result = client
145            .create_multipart_upload()
146            .bucket(&dst.bucket)
147            .key(&dst.key)
148            .acl(ObjectCannedAcl::BucketOwnerFullControl)
149            .send()
150            .await?;
151
152        let upload_id = result.upload_id().context("Expected Upload Id")?;
153
154        Ok(AsyncMultipartUpload {
155            client,
156            dst: dst.clone(),
157            upload_id: upload_id.into(),
158            part_size,
159            max_uploading_parts: max_uploading_parts.unwrap_or(DEFAULT_MAX_UPLOADING_PARTS),
160            state: AsyncMultipartUploadState::Writing {
161                uploads: vec![],
162                buffer: Vec::with_capacity(part_size),
163                part_number: 1,
164                completed_parts: vec![],
165            },
166        })
167    }
168
169    /// Import an existing [AsyncMultipartUpload] to continue uploading.
170    ///
171    /// * `client`              - S3 client to use.
172    /// * `upload_id`           - The multipart upload ID to resume.
173    /// * `dst`                 - The [S3Object] to write the object into.
174    /// * `part_size`           - How large, in bytes, each part should be. Must be
175    /// larger than 5MIB and smaller that 5GIB.
176    /// * `max_uploading_parts` - How many parts to upload concurrently,
177    /// Must be larger than 0 (defaults to 100).
178    #[instrument(skip(client))]
179    pub async fn from(
180        client: &'a Client,
181        upload_id: String,
182        dst: &S3Object,
183        part_size: usize,
184        max_uploading_parts: Option<usize>,
185    ) -> anyhow::Result<AsyncMultipartUpload<'a>> {
186        event!(Level::DEBUG, "New AsyncMultipartUpload");
187        if part_size < MIN_PART_SIZE {
188            anyhow::bail!("part_size was {part_size}, can not be less than {MIN_PART_SIZE}")
189        }
190        if part_size > MAX_PART_SIZE {
191            anyhow::bail!("part_size was {part_size}, can not be more than {MAX_PART_SIZE}")
192        }
193
194        //Check that user did not send in invalid parameter
195        if let Some(0) = max_uploading_parts {
196            anyhow::bail!("Max uploading parts must not be 0")
197        }
198
199        let mut parts: Vec<Part> = vec![];
200
201        let mut list_parts_result = client
202            .list_parts()
203            .bucket(&dst.bucket)
204            .key(&dst.key)
205            .upload_id(&upload_id)
206            .into_paginator()
207            .send();
208
209        // Use a paginator to collect all parts if there are more than 1000 parts in a multipart (the default and maximum part limit in list_parts())
210        // You could use `.into_paginator().items().send()` here to get a flattened list of items, but this seems to not work
211        // Any use of the items from that method results in a Future that never resolves, causing the program to hang indefinitely
212        // So instead, we use this pagination method which is still alright!
213        while let Some(Ok(page)) = list_parts_result.next().await {
214            parts.append(&mut page.parts().to_vec());
215
216            if !page.is_truncated().unwrap_or(false) {
217                break;
218            }
219        }
220
221        let completed_parts: Vec<CompletedPart> = parts
222            .iter()
223            .map(|part| {
224                // Part has Option<&str> but CompletedPart has Option<String>
225                // Convert any &str's to String's but retain them as Options still
226
227                let e_tag = part.e_tag().map(|s| s.to_string());
228                let checksum_crc32 = part.checksum_crc32().map(|s| s.to_string());
229                let checksum_crc32_c = part.checksum_crc32_c().map(|s| s.to_string());
230                let checksum_sha1 = part.checksum_sha1().map(|s| s.to_string());
231                let checksum_sha256 = part.checksum_sha256().map(|s| s.to_string());
232
233                Ok::<CompletedPart, anyhow::Error>(
234                    CompletedPart::builder()
235                        .set_e_tag(e_tag)
236                        .set_checksum_crc32(checksum_crc32)
237                        .set_checksum_crc32_c(checksum_crc32_c)
238                        .set_checksum_sha1(checksum_sha1)
239                        .set_checksum_sha256(checksum_sha256)
240                        .part_number(part.part_number().context("Expected part number")?)
241                        .build(),
242                )
243            })
244            .collect::<anyhow::Result<Vec<_>>>()?;
245
246        let latest_part_number = parts
247            .iter()
248            .filter_map(|p| p.part_number())
249            .max()
250            .unwrap_or(0)
251            + 1;
252
253        Ok(AsyncMultipartUpload {
254            client,
255            dst: dst.clone(),
256            upload_id: upload_id.clone(),
257            part_size,
258            max_uploading_parts: max_uploading_parts.unwrap_or(DEFAULT_MAX_UPLOADING_PARTS),
259            state: AsyncMultipartUploadState::Writing {
260                uploads: vec![],
261                buffer: Vec::with_capacity(part_size),
262                part_number: latest_part_number,
263                completed_parts,
264            },
265        })
266    }
267
268    pub fn get_upload_id(&self) -> &str {
269        self.upload_id.as_str()
270    }
271
272    #[instrument(skip(buffer))]
273    fn upload_part<'b>(&self, buffer: Vec<u8>, part_number: i32) -> MultipartUploadFuture<'b> {
274        event!(Level::DEBUG, "Uploading Part");
275        self.client
276            .upload_part()
277            .bucket(&self.dst.bucket)
278            .key(&self.dst.key)
279            .upload_id(&self.upload_id)
280            .part_number(part_number)
281            .body(ByteStream::from(buffer))
282            .send()
283            .map_ok(move |p| (p, part_number))
284            .boxed()
285    }
286
287    fn poll_all<T>(futures: &mut Vec<BoxFuture<T>>, cx: &mut Context) -> Vec<T> {
288        let mut pending = vec![];
289        let mut complete = vec![];
290
291        while let Some(mut f) = futures.pop() {
292            match Pin::new(&mut f).poll(cx) {
293                Poll::Ready(result) => complete.push(result),
294                Poll::Pending => pending.push(f),
295            }
296        }
297        futures.extend(pending);
298        complete
299    }
300
301    #[instrument]
302    fn try_collect_complete_parts(
303        complete_results: Vec<Result<(UploadPartOutput, i32), SdkError<UploadPartError>>>,
304    ) -> Result<Vec<CompletedPart>, Error> {
305        complete_results
306            .into_iter()
307            .map(|r| r.map_err(Error::other))
308            .map(|r| {
309                r.map(|(c, part_number)| {
310                    CompletedPart::builder()
311                        .set_e_tag(c.e_tag)
312                        .part_number(part_number)
313                        .build()
314                })
315            })
316            .collect::<Result<Vec<_>, _>>()
317    }
318
319    #[instrument]
320    fn complete_multipart_upload<'b>(
321        &self,
322        completed_parts: Vec<CompletedPart>,
323    ) -> CompleteMultipartUploadFuture<'b> {
324        self.client
325            .complete_multipart_upload()
326            .key(&self.dst.key)
327            .bucket(&self.dst.bucket)
328            .upload_id(&self.upload_id)
329            .multipart_upload(
330                CompletedMultipartUpload::builder()
331                    .set_parts(Some(completed_parts))
332                    .build(),
333            )
334            .send()
335            .boxed()
336    }
337
338    #[instrument(skip(uploads))]
339    fn check_uploads(
340        uploads: &mut Vec<MultipartUploadFuture<'a>>,
341        completed_parts: &mut Vec<CompletedPart>,
342        cx: &mut Context,
343    ) -> Result<(), Error> {
344        let complete_results = AsyncMultipartUpload::poll_all(uploads, cx);
345        let new_completed_parts =
346            AsyncMultipartUpload::try_collect_complete_parts(complete_results)?;
347        completed_parts.extend(new_completed_parts);
348        Ok(())
349    }
350}
351
352impl<'a> AsyncWrite for AsyncMultipartUpload<'a> {
353    #[instrument(skip(self, cx, buf))]
354    fn poll_write(
355        mut self: Pin<&mut AsyncMultipartUpload<'a>>,
356        cx: &mut Context<'_>,
357        buf: &[u8],
358    ) -> Poll<Result<usize, Error>> {
359        // I'm not sure how to work around borrow of two disjoint fields.
360        // I had lifetime issues trying to implement Split Borrows
361        event!(Level::DEBUG, "Polling write");
362        let state = std::mem::replace(&mut self.state, AsyncMultipartUploadState::None);
363        match state {
364            AsyncMultipartUploadState::Writing {
365                mut uploads,
366                mut buffer,
367                mut part_number,
368                mut completed_parts,
369            } => {
370                event!(Level::DEBUG, "Polling write while Writing");
371                //Poll current uploads to make space for in coming data
372                AsyncMultipartUpload::check_uploads(&mut uploads, &mut completed_parts, cx)?;
373                //only take enough bytes to fill remaining upload capacity
374                let upload_capacity =
375                    ((self.max_uploading_parts - uploads.len()) * self.part_size) - buffer.len();
376                let bytes_to_write = std::cmp::min(upload_capacity, buf.len());
377                // No capacity to upload
378                if bytes_to_write == 0 {
379                    uploads.is_empty().then(|| cx.waker().wake_by_ref());
380                    self.state = AsyncMultipartUploadState::Writing {
381                        uploads,
382                        buffer,
383                        part_number,
384                        completed_parts,
385                    };
386                    return Poll::Pending;
387                }
388                buffer.extend(&buf[..bytes_to_write]);
389
390                //keep pushing uploads until the buffer is small than the part size
391                while buffer.len() >= self.part_size {
392                    event!(Level::DEBUG, "Starting a new part upload");
393                    let mut part = buffer.split_off(self.part_size);
394                    // We want to consume the first part of the buffer and upload it to S3.
395                    // The split_off call does this but it's the wrong way around.
396                    // Use `mem:swap` to reverse the two variables in place.
397                    std::mem::swap(&mut buffer, &mut part);
398                    //Upload a new part
399                    let part_upload = self.upload_part(part, part_number);
400                    uploads.push(part_upload);
401                    part_number += 1;
402                }
403                //Poll all uploads, remove complete and fetch their results.
404                AsyncMultipartUpload::check_uploads(&mut uploads, &mut completed_parts, cx)?;
405                //Set state back
406                self.state = AsyncMultipartUploadState::Writing {
407                    uploads,
408                    buffer,
409                    part_number,
410                    completed_parts,
411                };
412                //Return number of bytes written from the input
413                Poll::Ready(Ok(bytes_to_write))
414            }
415            AsyncMultipartUploadState::None => Poll::Ready(Err(Error::other(
416                "Attempted to .write() when state is None",
417            ))),
418            _ => Poll::Ready(Err(Error::other("Attempted to .write() after .close()."))),
419        }
420    }
421
422    #[instrument(skip(self, cx))]
423    fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
424        //Ensure all pending uploads are completed.
425        match &mut self.state {
426            AsyncMultipartUploadState::Writing {
427                uploads,
428                completed_parts,
429                ..
430            } => {
431                event!(Level::DEBUG, "Flushing Multipart Uploads");
432                //Poll uploads and mark as completed
433                AsyncMultipartUpload::check_uploads(uploads, completed_parts, cx)?;
434                if uploads.is_empty() {
435                    event!(Level::DEBUG, "All part uploads are complete");
436                    Poll::Ready(Ok(()))
437                } else {
438                    event!(Level::DEBUG, "Waiting for uploads to complete");
439                    //Polled futures will trigger a wake
440                    Poll::Pending
441                }
442            }
443            AsyncMultipartUploadState::None => Poll::Ready(Err(Error::other(
444                "Attempted to .write() when state is None",
445            ))),
446            _ => Poll::Ready(Err(Error::other(
447                "Attempted to .flush() writer after .close().",
448            ))),
449        }
450    }
451
452    #[instrument(skip(self, cx))]
453    fn poll_close<'b>(
454        mut self: Pin<&'b mut AsyncMultipartUpload<'a>>,
455        cx: &'b mut Context<'_>,
456    ) -> Poll<Result<(), Error>> {
457        event!(Level::DEBUG, "Closing Multipart Uploads");
458        let state = std::mem::replace(&mut self.state, AsyncMultipartUploadState::None);
459        match state {
460            AsyncMultipartUploadState::Writing {
461                mut buffer,
462                mut uploads,
463                mut completed_parts,
464                part_number,
465            } => {
466                event!(Level::DEBUG, "Creating final Part Upload");
467                //make space for final upload
468                AsyncMultipartUpload::check_uploads(&mut uploads, &mut completed_parts, cx)?;
469                if self.max_uploading_parts - uploads.len() == 0 {
470                    event!(Level::DEBUG, "Waiting for available upload capacity");
471                    self.state = AsyncMultipartUploadState::Writing {
472                        buffer,
473                        uploads,
474                        completed_parts,
475                        part_number,
476                    };
477                    return Poll::Pending;
478                }
479                if !buffer.is_empty() {
480                    let buff = mem::take(&mut buffer);
481                    let part = self.upload_part(buff, part_number);
482                    uploads.push(part);
483                }
484                //Poll all uploads, remove complete and fetch their results.
485                AsyncMultipartUpload::check_uploads(&mut uploads, &mut completed_parts, cx)?;
486                // If no remaining uploads then trigger a wake to move to next state
487                uploads.is_empty().then(|| cx.waker().wake_by_ref());
488                // Change state to Completing parts
489                self.state = AsyncMultipartUploadState::CompletingParts {
490                    uploads,
491                    completed_parts,
492                };
493                Poll::Pending
494            }
495            AsyncMultipartUploadState::CompletingParts {
496                uploads,
497                mut completed_parts,
498            } if uploads.is_empty() => {
499                event!(
500                    Level::DEBUG,
501                    "AsyncS3Upload all parts uploaded, Completing Upload"
502                );
503                //Once uploads are empty change state to Completing
504                // This was surprising but was needed to complete the upload.
505                completed_parts.sort_by_key(|p| p.part_number());
506                let completing = self.complete_multipart_upload(completed_parts);
507                self.state = AsyncMultipartUploadState::Completing(completing);
508                // Trigger a wake to run with new state and poll the future
509                cx.waker().wake_by_ref();
510                Poll::Pending
511            }
512            AsyncMultipartUploadState::CompletingParts {
513                mut uploads,
514                mut completed_parts,
515            } => {
516                event!(
517                    Level::DEBUG,
518                    "AsyncS3Upload Waiting for All Parts to Upload"
519                );
520                //Poll all uploads, remove complete and fetch their results.
521                AsyncMultipartUpload::check_uploads(&mut uploads, &mut completed_parts, cx)?;
522                //Trigger a wake if all uploads have completed
523                uploads.is_empty().then(|| cx.waker().wake_by_ref());
524                self.state = AsyncMultipartUploadState::CompletingParts {
525                    uploads,
526                    completed_parts,
527                };
528                Poll::Pending
529            }
530            AsyncMultipartUploadState::Completing(mut fut) => {
531                //Don't use ready! macro to wait for complete uploaded to be done
532                //the state needs to be set back to Completing if future is Pending
533                event!(Level::DEBUG, "Waiting for upload complete to finish");
534                match Pin::new(&mut fut).poll(cx) {
535                    Poll::Pending => {
536                        self.state = AsyncMultipartUploadState::Completing(fut);
537                        Poll::Pending
538                    }
539                    Poll::Ready(Ok(_)) => {
540                        self.state = AsyncMultipartUploadState::Closed;
541                        Poll::Ready(Ok(()))
542                    }
543                    Poll::Ready(Err(e)) => {
544                        self.state = AsyncMultipartUploadState::Closed;
545                        Poll::Ready(Err(Error::other(e)))
546                    }
547                }
548            }
549            AsyncMultipartUploadState::None => Poll::Ready(Err(Error::other(
550                "Attempted to .close() writer with state None",
551            ))),
552            AsyncMultipartUploadState::Closed => Poll::Ready(Err(Error::other(
553                "Attempted to .close() writer after .close().",
554            ))),
555        }
556    }
557}
558
559#[cfg(test)]
560mod tests {
561    use super::*;
562    use crate::config::load_from_env;
563    use crate::localstack;
564    use crate::s3::test::*;
565    use crate::s3::{AsyncMultipartUpload, S3Object};
566    use ::function_name::named;
567    use anyhow::Result;
568    use aws_sdk_s3::Client;
569    use bytesize::MIB;
570    use futures::prelude::*;
571
572    #[tokio::test]
573    async fn test_part_size_too_small() {
574        let shared_config = load_from_env().await.unwrap();
575        let client = Client::new(&shared_config);
576        let dst = S3Object::new("bucket", "key");
577        assert!(AsyncMultipartUpload::new(&client, &dst, 0_usize, None)
578            .await
579            .is_err())
580    }
581
582    #[tokio::test]
583    async fn test_part_size_too_big() {
584        let shared_config = load_from_env().await.unwrap();
585        let client = Client::new(&shared_config);
586        let dst = S3Object::new("bucket", "key");
587        assert!(
588            AsyncMultipartUpload::new(&client, &dst, 5 * GIB as usize + 1, None)
589                .await
590                .is_err()
591        )
592    }
593
594    #[tokio::test]
595    async fn test_max_uploading_parts_is_zero() {
596        let shared_config = load_from_env().await.unwrap();
597        let client = Client::new(&shared_config);
598        let dst = S3Object::new("bucket", "key");
599        assert!(
600            AsyncMultipartUpload::new(&client, &dst, 5 * MIB as usize, Some(0))
601                .await
602                .is_err()
603        )
604    }
605
606    // *** Integration tests *** //
607    //Integration tests should be in src/tests but there is tight coupling with
608    //localstack which makes it hard to migrate away from this structure.
609    async fn localstack_test_client() -> Client {
610        localstack::test_utils::wait_for_localstack().await;
611        let shared_config = crate::config::load_from_env().await.unwrap();
612        let builder = aws_sdk_s3::config::Builder::from(&shared_config)
613            .force_path_style(true)
614            .build();
615        Client::from_conf(builder)
616    }
617
618    #[tokio::test]
619    #[named]
620    async fn test_put_single_part() -> Result<()> {
621        let client = localstack_test_client().await;
622        let test_bucket = "test-multipart-bucket";
623        let mut rng = seeded_rng(function_name!());
624        let dst_key = gen_random_file_name(&mut rng);
625
626        create_bucket(&client, test_bucket).await.unwrap();
627        let buffer_len = MIB as usize;
628
629        let dst = S3Object::new(test_bucket, &dst_key);
630        let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, None)
631            .await
632            .unwrap();
633        upload.write_all(&vec![0; buffer_len]).await.unwrap();
634        upload.close().await.unwrap();
635        let body = fetch_bytes(&client, &dst).await.unwrap();
636        assert_eq!(body.len(), buffer_len);
637        Ok(())
638    }
639
640    #[tokio::test]
641    #[named]
642    async fn test_import_and_continue_an_upload() -> Result<()> {
643        let client = localstack_test_client().await;
644        let test_bucket = "test-multipart-bucket";
645        let mut rng = seeded_rng(function_name!());
646        let dst_key = gen_random_file_name(&mut rng);
647
648        create_bucket(&client, test_bucket).await.unwrap();
649        let buffer_len = 5_usize * MIB as usize;
650
651        // create an existing upload to start
652        let dst = S3Object::new(test_bucket, &dst_key);
653        let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, None)
654            .await
655            .unwrap();
656        upload.write_all(&vec![0; buffer_len]).await.unwrap();
657        upload.flush().await.unwrap();
658
659        let mut resumed_upload = AsyncMultipartUpload::from(
660            &client,
661            upload.get_upload_id().to_string(),
662            &dst,
663            5_usize * MIB as usize,
664            None,
665        )
666        .await
667        .unwrap();
668
669        resumed_upload
670            .write_all(&vec![0; buffer_len])
671            .await
672            .unwrap();
673        resumed_upload.close().await.unwrap();
674
675        // body with 2 parts uploaded in different upload instances
676        let final_body = fetch_bytes(&client, &dst).await.unwrap();
677        assert_eq!(final_body.len(), 2_usize * buffer_len);
678        Ok(())
679    }
680
681    #[tokio::test]
682    #[named]
683    async fn test_put_10mb() -> Result<()> {
684        let client = localstack_test_client().await;
685        let test_bucket = "test-multipart-bucket";
686        let mut rng = seeded_rng(function_name!());
687        let dst_key = gen_random_file_name(&mut rng);
688
689        create_bucket(&client, test_bucket).await.unwrap();
690        let buffer_len = 10 * MIB as usize;
691
692        let dst = S3Object::new(test_bucket, &dst_key);
693        let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, None)
694            .await
695            .unwrap();
696        upload.write_all(&vec![0; buffer_len]).await.unwrap();
697        upload.close().await.unwrap();
698        let body = fetch_bytes(&client, &dst).await.unwrap();
699        assert_eq!(body.len(), buffer_len);
700        Ok(())
701    }
702    #[tokio::test]
703    #[named]
704    async fn test_put_14mb() -> Result<()> {
705        let client = localstack_test_client().await;
706        let test_bucket = "test-multipart-bucket";
707        let mut rng = seeded_rng(function_name!());
708        let dst_key = gen_random_file_name(&mut rng);
709
710        create_bucket(&client, test_bucket).await.unwrap();
711        let buffer_len = 14 * MIB as usize;
712
713        let dst = S3Object::new(test_bucket, &dst_key);
714        let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, None)
715            .await
716            .unwrap();
717        upload.write_all(&vec![0; buffer_len]).await.unwrap();
718        upload.close().await.unwrap();
719        let body = fetch_bytes(&client, &dst).await.unwrap();
720        assert_eq!(body.len(), buffer_len);
721        Ok(())
722    }
723
724    #[tokio::test]
725    #[named]
726    async fn test_put_flush() -> Result<()> {
727        let client = localstack_test_client().await;
728        let test_bucket = "test-multipart-bucket";
729        let mut rng = seeded_rng(function_name!());
730        let dst_key = gen_random_file_name(&mut rng);
731
732        create_bucket(&client, test_bucket).await.unwrap();
733        let buffer_len = MIB as usize;
734
735        let dst = S3Object::new(test_bucket, &dst_key);
736        let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, Some(1))
737            .await
738            .unwrap();
739        upload.write_all(&vec![0; buffer_len]).await.unwrap();
740        upload.flush().await.unwrap();
741        upload.close().await.unwrap();
742        let body = fetch_bytes(&client, &dst).await.unwrap();
743        assert_eq!(body.len(), buffer_len);
744        Ok(())
745    }
746
747    #[tokio::test]
748    #[named]
749    async fn test_put_double_close() -> Result<()> {
750        let client = localstack_test_client().await;
751        let test_bucket = "test-multipart-bucket";
752        let mut rng = seeded_rng(function_name!());
753        let dst_key = gen_random_file_name(&mut rng);
754
755        create_bucket(&client, test_bucket).await.unwrap();
756        let buffer_len = 100_usize;
757
758        let dst = S3Object::new(test_bucket, &dst_key);
759        let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, Some(1))
760            .await
761            .unwrap();
762        upload.write_all(&vec![0; buffer_len]).await.unwrap();
763        upload.close().await.unwrap();
764        assert!(upload.close().await.is_err());
765        Ok(())
766    }
767
768    #[tokio::test]
769    #[named]
770    async fn test_put_write_after_close() -> Result<()> {
771        let client = localstack_test_client().await;
772        let test_bucket = "test-multipart-bucket";
773        let mut rng = seeded_rng(function_name!());
774        let dst_key = gen_random_file_name(&mut rng);
775
776        create_bucket(&client, test_bucket).await.unwrap();
777        let buffer_len = 100_usize;
778
779        let dst = S3Object::new(test_bucket, &dst_key);
780        let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, Some(1))
781            .await
782            .unwrap();
783        upload.write_all(&vec![0; buffer_len]).await.unwrap();
784        upload.close().await.unwrap();
785        assert!(upload.write_all(&vec![0; buffer_len]).await.is_err());
786        Ok(())
787    }
788
789    #[tokio::test]
790    #[named]
791    async fn test_put_16mb_single_upload() {
792        let client = localstack_test_client().await;
793        let test_bucket = "test-multipart-bucket";
794        let mut rng = seeded_rng(function_name!());
795        let dst_key = gen_random_file_name(&mut rng);
796
797        create_bucket(&client, test_bucket).await.unwrap();
798
799        let dst = S3Object::new(test_bucket, &dst_key);
800        let mut upload = AsyncMultipartUpload::new(&client, &dst, 5 * MIB as usize, Some(1))
801            .await
802            .unwrap();
803
804        let data_len = 16 * MIB as usize;
805
806        upload.write_all(&vec![0; data_len]).await.unwrap();
807        upload.close().await.unwrap();
808        let bytes = fetch_bytes(&client, &S3Object::new(test_bucket, &dst_key))
809            .await
810            .unwrap();
811        assert_eq!(data_len, bytes.len())
812    }
813}