1use std::mem;
5use std::pin::Pin;
6
7use anyhow::Context as _;
9use aws_sdk_s3::primitives::ByteStream;
10use aws_sdk_s3::{
11 operation::{
12 complete_multipart_upload::{CompleteMultipartUploadError, CompleteMultipartUploadOutput},
13 upload_part::{UploadPartError, UploadPartOutput},
14 },
15 types::{CompletedMultipartUpload, CompletedPart, ObjectCannedAcl, Part},
16 Client,
17};
18use bytesize::{GIB, MIB};
19use derive_more::Debug;
20use futures::{
21 future::BoxFuture,
22 io::Error,
23 task::{Context, Poll},
24 AsyncWrite, Future, FutureExt, TryFutureExt,
25};
26use tracing::{event, instrument, Level};
27
28use crate::s3::S3Object;
30use crate::types::SdkError;
31
32type MultipartUploadFuture<'a> =
34 BoxFuture<'a, Result<(UploadPartOutput, i32), SdkError<UploadPartError>>>;
35type CompleteMultipartUploadFuture<'a> =
37 BoxFuture<'a, Result<CompleteMultipartUploadOutput, SdkError<CompleteMultipartUploadError>>>;
38
39#[derive(Debug)]
41enum AsyncMultipartUploadState<'a> {
42 None,
44 Writing {
46 #[debug(skip)]
48 uploads: Vec<MultipartUploadFuture<'a>>,
49 buffer: Vec<u8>,
51 part_number: i32,
53 completed_parts: Vec<CompletedPart>,
55 },
56 CompletingParts {
58 #[debug(skip)]
59 uploads: Vec<MultipartUploadFuture<'a>>,
60 completed_parts: Vec<CompletedPart>,
61 },
62 Completing(#[debug(skip)] CompleteMultipartUploadFuture<'a>),
64 Closed,
66}
67
68#[derive(Debug)]
97pub struct AsyncMultipartUpload<'a> {
98 client: &'a Client,
99 dst: S3Object,
100 upload_id: String,
101 part_size: usize,
102 max_uploading_parts: usize,
103 state: AsyncMultipartUploadState<'a>,
104}
105
106const MIN_PART_SIZE: usize = 5_usize * MIB as usize; const MAX_PART_SIZE: usize = 5_usize * GIB as usize; const DEFAULT_MAX_UPLOADING_PARTS: usize = 100;
114
115impl<'a> AsyncMultipartUpload<'a> {
116 #[instrument(skip(client))]
125 pub async fn new(
126 client: &'a Client,
127 dst: &S3Object,
128 part_size: usize,
129 max_uploading_parts: Option<usize>,
130 ) -> anyhow::Result<AsyncMultipartUpload<'a>> {
131 event!(Level::DEBUG, "New AsyncMultipartUpload");
132 if part_size < MIN_PART_SIZE {
133 anyhow::bail!("part_size was {part_size}, can not be less than {MIN_PART_SIZE}")
134 }
135 if part_size > MAX_PART_SIZE {
136 anyhow::bail!("part_size was {part_size}, can not be more than {MAX_PART_SIZE}")
137 }
138
139 if let Some(0) = max_uploading_parts {
141 anyhow::bail!("Max uploading parts must not be 0")
142 }
143
144 let result = client
145 .create_multipart_upload()
146 .bucket(&dst.bucket)
147 .key(&dst.key)
148 .acl(ObjectCannedAcl::BucketOwnerFullControl)
149 .send()
150 .await?;
151
152 let upload_id = result.upload_id().context("Expected Upload Id")?;
153
154 Ok(AsyncMultipartUpload {
155 client,
156 dst: dst.clone(),
157 upload_id: upload_id.into(),
158 part_size,
159 max_uploading_parts: max_uploading_parts.unwrap_or(DEFAULT_MAX_UPLOADING_PARTS),
160 state: AsyncMultipartUploadState::Writing {
161 uploads: vec![],
162 buffer: Vec::with_capacity(part_size),
163 part_number: 1,
164 completed_parts: vec![],
165 },
166 })
167 }
168
169 #[instrument(skip(client))]
179 pub async fn from(
180 client: &'a Client,
181 upload_id: String,
182 dst: &S3Object,
183 part_size: usize,
184 max_uploading_parts: Option<usize>,
185 ) -> anyhow::Result<AsyncMultipartUpload<'a>> {
186 event!(Level::DEBUG, "New AsyncMultipartUpload");
187 if part_size < MIN_PART_SIZE {
188 anyhow::bail!("part_size was {part_size}, can not be less than {MIN_PART_SIZE}")
189 }
190 if part_size > MAX_PART_SIZE {
191 anyhow::bail!("part_size was {part_size}, can not be more than {MAX_PART_SIZE}")
192 }
193
194 if let Some(0) = max_uploading_parts {
196 anyhow::bail!("Max uploading parts must not be 0")
197 }
198
199 let mut parts: Vec<Part> = vec![];
200
201 let mut list_parts_result = client
202 .list_parts()
203 .bucket(&dst.bucket)
204 .key(&dst.key)
205 .upload_id(&upload_id)
206 .into_paginator()
207 .send();
208
209 while let Some(Ok(page)) = list_parts_result.next().await {
214 parts.append(&mut page.parts().to_vec());
215
216 if !page.is_truncated().unwrap_or(false) {
217 break;
218 }
219 }
220
221 let completed_parts: Vec<CompletedPart> = parts
222 .iter()
223 .map(|part| {
224 let e_tag = part.e_tag().map(|s| s.to_string());
228 let checksum_crc32 = part.checksum_crc32().map(|s| s.to_string());
229 let checksum_crc32_c = part.checksum_crc32_c().map(|s| s.to_string());
230 let checksum_sha1 = part.checksum_sha1().map(|s| s.to_string());
231 let checksum_sha256 = part.checksum_sha256().map(|s| s.to_string());
232
233 Ok::<CompletedPart, anyhow::Error>(
234 CompletedPart::builder()
235 .set_e_tag(e_tag)
236 .set_checksum_crc32(checksum_crc32)
237 .set_checksum_crc32_c(checksum_crc32_c)
238 .set_checksum_sha1(checksum_sha1)
239 .set_checksum_sha256(checksum_sha256)
240 .part_number(part.part_number().context("Expected part number")?)
241 .build(),
242 )
243 })
244 .collect::<anyhow::Result<Vec<_>>>()?;
245
246 let latest_part_number = parts
247 .iter()
248 .filter_map(|p| p.part_number())
249 .max()
250 .unwrap_or(0)
251 + 1;
252
253 Ok(AsyncMultipartUpload {
254 client,
255 dst: dst.clone(),
256 upload_id: upload_id.clone(),
257 part_size,
258 max_uploading_parts: max_uploading_parts.unwrap_or(DEFAULT_MAX_UPLOADING_PARTS),
259 state: AsyncMultipartUploadState::Writing {
260 uploads: vec![],
261 buffer: Vec::with_capacity(part_size),
262 part_number: latest_part_number,
263 completed_parts,
264 },
265 })
266 }
267
268 pub fn get_upload_id(&self) -> &str {
269 self.upload_id.as_str()
270 }
271
272 #[instrument(skip(buffer))]
273 fn upload_part<'b>(&self, buffer: Vec<u8>, part_number: i32) -> MultipartUploadFuture<'b> {
274 event!(Level::DEBUG, "Uploading Part");
275 self.client
276 .upload_part()
277 .bucket(&self.dst.bucket)
278 .key(&self.dst.key)
279 .upload_id(&self.upload_id)
280 .part_number(part_number)
281 .body(ByteStream::from(buffer))
282 .send()
283 .map_ok(move |p| (p, part_number))
284 .boxed()
285 }
286
287 fn poll_all<T>(futures: &mut Vec<BoxFuture<T>>, cx: &mut Context) -> Vec<T> {
288 let mut pending = vec![];
289 let mut complete = vec![];
290
291 while let Some(mut f) = futures.pop() {
292 match Pin::new(&mut f).poll(cx) {
293 Poll::Ready(result) => complete.push(result),
294 Poll::Pending => pending.push(f),
295 }
296 }
297 futures.extend(pending);
298 complete
299 }
300
301 #[instrument]
302 fn try_collect_complete_parts(
303 complete_results: Vec<Result<(UploadPartOutput, i32), SdkError<UploadPartError>>>,
304 ) -> Result<Vec<CompletedPart>, Error> {
305 complete_results
306 .into_iter()
307 .map(|r| r.map_err(Error::other))
308 .map(|r| {
309 r.map(|(c, part_number)| {
310 CompletedPart::builder()
311 .set_e_tag(c.e_tag)
312 .part_number(part_number)
313 .build()
314 })
315 })
316 .collect::<Result<Vec<_>, _>>()
317 }
318
319 #[instrument]
320 fn complete_multipart_upload<'b>(
321 &self,
322 completed_parts: Vec<CompletedPart>,
323 ) -> CompleteMultipartUploadFuture<'b> {
324 self.client
325 .complete_multipart_upload()
326 .key(&self.dst.key)
327 .bucket(&self.dst.bucket)
328 .upload_id(&self.upload_id)
329 .multipart_upload(
330 CompletedMultipartUpload::builder()
331 .set_parts(Some(completed_parts))
332 .build(),
333 )
334 .send()
335 .boxed()
336 }
337
338 #[instrument(skip(uploads))]
339 fn check_uploads(
340 uploads: &mut Vec<MultipartUploadFuture<'a>>,
341 completed_parts: &mut Vec<CompletedPart>,
342 cx: &mut Context,
343 ) -> Result<(), Error> {
344 let complete_results = AsyncMultipartUpload::poll_all(uploads, cx);
345 let new_completed_parts =
346 AsyncMultipartUpload::try_collect_complete_parts(complete_results)?;
347 completed_parts.extend(new_completed_parts);
348 Ok(())
349 }
350}
351
352impl<'a> AsyncWrite for AsyncMultipartUpload<'a> {
353 #[instrument(skip(self, cx, buf))]
354 fn poll_write(
355 mut self: Pin<&mut AsyncMultipartUpload<'a>>,
356 cx: &mut Context<'_>,
357 buf: &[u8],
358 ) -> Poll<Result<usize, Error>> {
359 event!(Level::DEBUG, "Polling write");
362 let state = std::mem::replace(&mut self.state, AsyncMultipartUploadState::None);
363 match state {
364 AsyncMultipartUploadState::Writing {
365 mut uploads,
366 mut buffer,
367 mut part_number,
368 mut completed_parts,
369 } => {
370 event!(Level::DEBUG, "Polling write while Writing");
371 AsyncMultipartUpload::check_uploads(&mut uploads, &mut completed_parts, cx)?;
373 let upload_capacity =
375 ((self.max_uploading_parts - uploads.len()) * self.part_size) - buffer.len();
376 let bytes_to_write = std::cmp::min(upload_capacity, buf.len());
377 if bytes_to_write == 0 {
379 uploads.is_empty().then(|| cx.waker().wake_by_ref());
380 self.state = AsyncMultipartUploadState::Writing {
381 uploads,
382 buffer,
383 part_number,
384 completed_parts,
385 };
386 return Poll::Pending;
387 }
388 buffer.extend(&buf[..bytes_to_write]);
389
390 while buffer.len() >= self.part_size {
392 event!(Level::DEBUG, "Starting a new part upload");
393 let mut part = buffer.split_off(self.part_size);
394 std::mem::swap(&mut buffer, &mut part);
398 let part_upload = self.upload_part(part, part_number);
400 uploads.push(part_upload);
401 part_number += 1;
402 }
403 AsyncMultipartUpload::check_uploads(&mut uploads, &mut completed_parts, cx)?;
405 self.state = AsyncMultipartUploadState::Writing {
407 uploads,
408 buffer,
409 part_number,
410 completed_parts,
411 };
412 Poll::Ready(Ok(bytes_to_write))
414 }
415 AsyncMultipartUploadState::None => Poll::Ready(Err(Error::other(
416 "Attempted to .write() when state is None",
417 ))),
418 _ => Poll::Ready(Err(Error::other("Attempted to .write() after .close()."))),
419 }
420 }
421
422 #[instrument(skip(self, cx))]
423 fn poll_flush(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Result<(), Error>> {
424 match &mut self.state {
426 AsyncMultipartUploadState::Writing {
427 uploads,
428 completed_parts,
429 ..
430 } => {
431 event!(Level::DEBUG, "Flushing Multipart Uploads");
432 AsyncMultipartUpload::check_uploads(uploads, completed_parts, cx)?;
434 if uploads.is_empty() {
435 event!(Level::DEBUG, "All part uploads are complete");
436 Poll::Ready(Ok(()))
437 } else {
438 event!(Level::DEBUG, "Waiting for uploads to complete");
439 Poll::Pending
441 }
442 }
443 AsyncMultipartUploadState::None => Poll::Ready(Err(Error::other(
444 "Attempted to .write() when state is None",
445 ))),
446 _ => Poll::Ready(Err(Error::other(
447 "Attempted to .flush() writer after .close().",
448 ))),
449 }
450 }
451
452 #[instrument(skip(self, cx))]
453 fn poll_close<'b>(
454 mut self: Pin<&'b mut AsyncMultipartUpload<'a>>,
455 cx: &'b mut Context<'_>,
456 ) -> Poll<Result<(), Error>> {
457 event!(Level::DEBUG, "Closing Multipart Uploads");
458 let state = std::mem::replace(&mut self.state, AsyncMultipartUploadState::None);
459 match state {
460 AsyncMultipartUploadState::Writing {
461 mut buffer,
462 mut uploads,
463 mut completed_parts,
464 part_number,
465 } => {
466 event!(Level::DEBUG, "Creating final Part Upload");
467 AsyncMultipartUpload::check_uploads(&mut uploads, &mut completed_parts, cx)?;
469 if self.max_uploading_parts - uploads.len() == 0 {
470 event!(Level::DEBUG, "Waiting for available upload capacity");
471 self.state = AsyncMultipartUploadState::Writing {
472 buffer,
473 uploads,
474 completed_parts,
475 part_number,
476 };
477 return Poll::Pending;
478 }
479 if !buffer.is_empty() {
480 let buff = mem::take(&mut buffer);
481 let part = self.upload_part(buff, part_number);
482 uploads.push(part);
483 }
484 AsyncMultipartUpload::check_uploads(&mut uploads, &mut completed_parts, cx)?;
486 uploads.is_empty().then(|| cx.waker().wake_by_ref());
488 self.state = AsyncMultipartUploadState::CompletingParts {
490 uploads,
491 completed_parts,
492 };
493 Poll::Pending
494 }
495 AsyncMultipartUploadState::CompletingParts {
496 uploads,
497 mut completed_parts,
498 } if uploads.is_empty() => {
499 event!(
500 Level::DEBUG,
501 "AsyncS3Upload all parts uploaded, Completing Upload"
502 );
503 completed_parts.sort_by_key(|p| p.part_number());
506 let completing = self.complete_multipart_upload(completed_parts);
507 self.state = AsyncMultipartUploadState::Completing(completing);
508 cx.waker().wake_by_ref();
510 Poll::Pending
511 }
512 AsyncMultipartUploadState::CompletingParts {
513 mut uploads,
514 mut completed_parts,
515 } => {
516 event!(
517 Level::DEBUG,
518 "AsyncS3Upload Waiting for All Parts to Upload"
519 );
520 AsyncMultipartUpload::check_uploads(&mut uploads, &mut completed_parts, cx)?;
522 uploads.is_empty().then(|| cx.waker().wake_by_ref());
524 self.state = AsyncMultipartUploadState::CompletingParts {
525 uploads,
526 completed_parts,
527 };
528 Poll::Pending
529 }
530 AsyncMultipartUploadState::Completing(mut fut) => {
531 event!(Level::DEBUG, "Waiting for upload complete to finish");
534 match Pin::new(&mut fut).poll(cx) {
535 Poll::Pending => {
536 self.state = AsyncMultipartUploadState::Completing(fut);
537 Poll::Pending
538 }
539 Poll::Ready(Ok(_)) => {
540 self.state = AsyncMultipartUploadState::Closed;
541 Poll::Ready(Ok(()))
542 }
543 Poll::Ready(Err(e)) => {
544 self.state = AsyncMultipartUploadState::Closed;
545 Poll::Ready(Err(Error::other(e)))
546 }
547 }
548 }
549 AsyncMultipartUploadState::None => Poll::Ready(Err(Error::other(
550 "Attempted to .close() writer with state None",
551 ))),
552 AsyncMultipartUploadState::Closed => Poll::Ready(Err(Error::other(
553 "Attempted to .close() writer after .close().",
554 ))),
555 }
556 }
557}
558
559#[cfg(test)]
560mod tests {
561 use super::*;
562 use crate::config::load_from_env;
563 use crate::localstack;
564 use crate::s3::test::*;
565 use crate::s3::{AsyncMultipartUpload, S3Object};
566 use ::function_name::named;
567 use anyhow::Result;
568 use aws_sdk_s3::Client;
569 use bytesize::MIB;
570 use futures::prelude::*;
571
572 #[tokio::test]
573 async fn test_part_size_too_small() {
574 let shared_config = load_from_env().await.unwrap();
575 let client = Client::new(&shared_config);
576 let dst = S3Object::new("bucket", "key");
577 assert!(AsyncMultipartUpload::new(&client, &dst, 0_usize, None)
578 .await
579 .is_err())
580 }
581
582 #[tokio::test]
583 async fn test_part_size_too_big() {
584 let shared_config = load_from_env().await.unwrap();
585 let client = Client::new(&shared_config);
586 let dst = S3Object::new("bucket", "key");
587 assert!(
588 AsyncMultipartUpload::new(&client, &dst, 5 * GIB as usize + 1, None)
589 .await
590 .is_err()
591 )
592 }
593
594 #[tokio::test]
595 async fn test_max_uploading_parts_is_zero() {
596 let shared_config = load_from_env().await.unwrap();
597 let client = Client::new(&shared_config);
598 let dst = S3Object::new("bucket", "key");
599 assert!(
600 AsyncMultipartUpload::new(&client, &dst, 5 * MIB as usize, Some(0))
601 .await
602 .is_err()
603 )
604 }
605
606 async fn localstack_test_client() -> Client {
610 localstack::test_utils::wait_for_localstack().await;
611 let shared_config = crate::config::load_from_env().await.unwrap();
612 let builder = aws_sdk_s3::config::Builder::from(&shared_config)
613 .force_path_style(true)
614 .build();
615 Client::from_conf(builder)
616 }
617
618 #[tokio::test]
619 #[named]
620 async fn test_put_single_part() -> Result<()> {
621 let client = localstack_test_client().await;
622 let test_bucket = "test-multipart-bucket";
623 let mut rng = seeded_rng(function_name!());
624 let dst_key = gen_random_file_name(&mut rng);
625
626 create_bucket(&client, test_bucket).await.unwrap();
627 let buffer_len = MIB as usize;
628
629 let dst = S3Object::new(test_bucket, &dst_key);
630 let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, None)
631 .await
632 .unwrap();
633 upload.write_all(&vec![0; buffer_len]).await.unwrap();
634 upload.close().await.unwrap();
635 let body = fetch_bytes(&client, &dst).await.unwrap();
636 assert_eq!(body.len(), buffer_len);
637 Ok(())
638 }
639
640 #[tokio::test]
641 #[named]
642 async fn test_import_and_continue_an_upload() -> Result<()> {
643 let client = localstack_test_client().await;
644 let test_bucket = "test-multipart-bucket";
645 let mut rng = seeded_rng(function_name!());
646 let dst_key = gen_random_file_name(&mut rng);
647
648 create_bucket(&client, test_bucket).await.unwrap();
649 let buffer_len = 5_usize * MIB as usize;
650
651 let dst = S3Object::new(test_bucket, &dst_key);
653 let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, None)
654 .await
655 .unwrap();
656 upload.write_all(&vec![0; buffer_len]).await.unwrap();
657 upload.flush().await.unwrap();
658
659 let mut resumed_upload = AsyncMultipartUpload::from(
660 &client,
661 upload.get_upload_id().to_string(),
662 &dst,
663 5_usize * MIB as usize,
664 None,
665 )
666 .await
667 .unwrap();
668
669 resumed_upload
670 .write_all(&vec![0; buffer_len])
671 .await
672 .unwrap();
673 resumed_upload.close().await.unwrap();
674
675 let final_body = fetch_bytes(&client, &dst).await.unwrap();
677 assert_eq!(final_body.len(), 2_usize * buffer_len);
678 Ok(())
679 }
680
681 #[tokio::test]
682 #[named]
683 async fn test_put_10mb() -> Result<()> {
684 let client = localstack_test_client().await;
685 let test_bucket = "test-multipart-bucket";
686 let mut rng = seeded_rng(function_name!());
687 let dst_key = gen_random_file_name(&mut rng);
688
689 create_bucket(&client, test_bucket).await.unwrap();
690 let buffer_len = 10 * MIB as usize;
691
692 let dst = S3Object::new(test_bucket, &dst_key);
693 let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, None)
694 .await
695 .unwrap();
696 upload.write_all(&vec![0; buffer_len]).await.unwrap();
697 upload.close().await.unwrap();
698 let body = fetch_bytes(&client, &dst).await.unwrap();
699 assert_eq!(body.len(), buffer_len);
700 Ok(())
701 }
702 #[tokio::test]
703 #[named]
704 async fn test_put_14mb() -> Result<()> {
705 let client = localstack_test_client().await;
706 let test_bucket = "test-multipart-bucket";
707 let mut rng = seeded_rng(function_name!());
708 let dst_key = gen_random_file_name(&mut rng);
709
710 create_bucket(&client, test_bucket).await.unwrap();
711 let buffer_len = 14 * MIB as usize;
712
713 let dst = S3Object::new(test_bucket, &dst_key);
714 let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, None)
715 .await
716 .unwrap();
717 upload.write_all(&vec![0; buffer_len]).await.unwrap();
718 upload.close().await.unwrap();
719 let body = fetch_bytes(&client, &dst).await.unwrap();
720 assert_eq!(body.len(), buffer_len);
721 Ok(())
722 }
723
724 #[tokio::test]
725 #[named]
726 async fn test_put_flush() -> Result<()> {
727 let client = localstack_test_client().await;
728 let test_bucket = "test-multipart-bucket";
729 let mut rng = seeded_rng(function_name!());
730 let dst_key = gen_random_file_name(&mut rng);
731
732 create_bucket(&client, test_bucket).await.unwrap();
733 let buffer_len = MIB as usize;
734
735 let dst = S3Object::new(test_bucket, &dst_key);
736 let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, Some(1))
737 .await
738 .unwrap();
739 upload.write_all(&vec![0; buffer_len]).await.unwrap();
740 upload.flush().await.unwrap();
741 upload.close().await.unwrap();
742 let body = fetch_bytes(&client, &dst).await.unwrap();
743 assert_eq!(body.len(), buffer_len);
744 Ok(())
745 }
746
747 #[tokio::test]
748 #[named]
749 async fn test_put_double_close() -> Result<()> {
750 let client = localstack_test_client().await;
751 let test_bucket = "test-multipart-bucket";
752 let mut rng = seeded_rng(function_name!());
753 let dst_key = gen_random_file_name(&mut rng);
754
755 create_bucket(&client, test_bucket).await.unwrap();
756 let buffer_len = 100_usize;
757
758 let dst = S3Object::new(test_bucket, &dst_key);
759 let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, Some(1))
760 .await
761 .unwrap();
762 upload.write_all(&vec![0; buffer_len]).await.unwrap();
763 upload.close().await.unwrap();
764 assert!(upload.close().await.is_err());
765 Ok(())
766 }
767
768 #[tokio::test]
769 #[named]
770 async fn test_put_write_after_close() -> Result<()> {
771 let client = localstack_test_client().await;
772 let test_bucket = "test-multipart-bucket";
773 let mut rng = seeded_rng(function_name!());
774 let dst_key = gen_random_file_name(&mut rng);
775
776 create_bucket(&client, test_bucket).await.unwrap();
777 let buffer_len = 100_usize;
778
779 let dst = S3Object::new(test_bucket, &dst_key);
780 let mut upload = AsyncMultipartUpload::new(&client, &dst, 5_usize * MIB as usize, Some(1))
781 .await
782 .unwrap();
783 upload.write_all(&vec![0; buffer_len]).await.unwrap();
784 upload.close().await.unwrap();
785 assert!(upload.write_all(&vec![0; buffer_len]).await.is_err());
786 Ok(())
787 }
788
789 #[tokio::test]
790 #[named]
791 async fn test_put_16mb_single_upload() {
792 let client = localstack_test_client().await;
793 let test_bucket = "test-multipart-bucket";
794 let mut rng = seeded_rng(function_name!());
795 let dst_key = gen_random_file_name(&mut rng);
796
797 create_bucket(&client, test_bucket).await.unwrap();
798
799 let dst = S3Object::new(test_bucket, &dst_key);
800 let mut upload = AsyncMultipartUpload::new(&client, &dst, 5 * MIB as usize, Some(1))
801 .await
802 .unwrap();
803
804 let data_len = 16 * MIB as usize;
805
806 upload.write_all(&vec![0; data_len]).await.unwrap();
807 upload.close().await.unwrap();
808 let bytes = fetch_bytes(&client, &S3Object::new(test_bucket, &dst_key))
809 .await
810 .unwrap();
811 assert_eq!(data_len, bytes.len())
812 }
813}