Skip to main content

aws_multipart_upload/
upload.rs

1//! Core types for multipart uploads.
2//!
3//! The module defines [`Upload`] and [`EncodeUpload`], which are values that
4//! can perform the complete multipart upload, along with a builder for them.
5use std::pin::Pin;
6use std::task::{Context, Poll};
7
8use bytesize::ByteSize;
9use multipart_write::{FusedMultipartWrite, MultipartWrite};
10
11use crate::client::part::PartBody;
12use crate::client::{UploadApi, UploadClient};
13use crate::encoder::PartEncoder;
14use crate::error::Error;
15use crate::uri::{EmptyUri, ObjectUri, ObjectUriIter, OneTimeUse};
16use crate::write::multipart_upload::MultipartUpload;
17use crate::write::with_part_encoder::WithPartEncoder;
18
19pub use crate::write::multipart_upload::UploadStatus;
20pub use crate::write::with_part_encoder::EncoderStatus;
21pub use crate::write::{ShouldComplete, Uploaded};
22
23pub mod stream {
24    //! Extensions to `Stream` for multipart uploads.
25    pub use crate::write::{CollectUpload, TryUploadWhen, UploadStreamExt};
26}
27
28// https://docs.aws.amazon.com/AmazonS3/latest/userguide/qfacts.html
29pub(crate) const AWS_MAX_OBJECT_SIZE: ByteSize = ByteSize::gib(48800);
30pub(crate) const AWS_MIN_PART_SIZE: ByteSize = ByteSize::mib(5);
31pub(crate) const AWS_MAX_PART_SIZE: ByteSize = ByteSize::gib(5);
32pub(crate) const DEFAULT_MAX_OBJECT_SIZE: ByteSize = ByteSize::mib(128);
33pub(crate) const DEFAULT_MAX_PART_SIZE: ByteSize = ByteSize::mib(10);
34
35/// `UploadBuilder` builds an `Upload`.
36#[derive(Debug)]
37pub struct UploadBuilder {
38    client: UploadClient,
39    max_bytes: ByteSize,
40    max_part_bytes: ByteSize,
41    iter: ObjectUriIter,
42    capacity: Option<usize>,
43}
44
45impl UploadBuilder {
46    /// New `UploadBuilder` with defaults.
47    pub fn new<C>(client: C) -> Self
48    where
49        C: UploadApi + 'static,
50    {
51        Self {
52            client: UploadClient::new(client),
53            max_bytes: DEFAULT_MAX_OBJECT_SIZE,
54            max_part_bytes: DEFAULT_MAX_PART_SIZE,
55            iter: ObjectUriIter::new(EmptyUri.into_iter()),
56            capacity: Some(10),
57        }
58    }
59
60    /// Set the target size of the upload. The maximum is 48.8TiB and the
61    /// default is 128MiB.
62    ///
63    /// The reason for the choice is it has to be something, and this was a good
64    /// rule of thumb for block size in Hadoop HDFS.
65    pub fn with_upload_size(self, limit: ByteSize) -> Self {
66        Self { max_bytes: limit.min(AWS_MAX_OBJECT_SIZE), ..self }
67    }
68
69    /// Set the target size of a part.  This has to be between 5MiB and 5GiB;
70    /// the default is 10MiB.
71    pub fn with_part_size(self, limit: ByteSize) -> Self {
72        Self {
73            // Clamp to AWS_MIN <= max_part_bytes <= min(AWS_MAX, usize::MAX).
74            max_part_bytes: limit
75                .max(AWS_MIN_PART_SIZE)
76                .min(AWS_MAX_PART_SIZE)
77                .min(ByteSize::b(usize::MAX as u64)),
78            ..self
79        }
80    }
81
82    /// Set the destination object URI for a single upload.
83    ///
84    /// The resulting [`Upload`] can be used only once.
85    pub fn with_uri<T: Into<ObjectUri>>(self, uri: T) -> Self {
86        let inner = OneTimeUse::new(uri.into());
87        Self { iter: ObjectUriIter::new(inner), ..self }
88    }
89
90    /// Set the destination object URI to be generated using the provided
91    /// iterator.
92    ///
93    /// The resulting [`Upload`] will be reusable for as long as the iterator
94    /// can produce the next `ObjectUri`.
95    pub fn with_uri_iter<I>(self, inner: I) -> Self
96    where
97        I: Iterator<Item = ObjectUri> + Send + Sync + 'static,
98    {
99        let iter = ObjectUriIter::new(inner);
100        Self { iter, ..self }
101    }
102
103    /// Set a limit to the number of active part upload requests that can exist
104    /// at one time.
105    ///
106    /// `None` or `Some(0)` is interpreted as "unlimited" capacity.  By
107    /// arbitrary choice the default is 10.
108    pub fn with_capacity<T: Into<Option<usize>>>(self, capacity: T) -> Self {
109        Self { capacity: capacity.into(), ..self }
110    }
111
112    /// Build an `Upload` from this configuration.
113    pub fn build(self) -> Upload {
114        Upload::new(self.client, self.iter, self.max_bytes, self.capacity)
115    }
116
117    /// Build an `EncodeUpload` that constructs the multipart upload from
118    /// `Item`s encoded with `E`.
119    pub fn build_encoded<Item, E>(self, encoder: E) -> EncodeUpload<Item, E> {
120        let bytes = self.max_bytes;
121        let part_bytes = self.max_part_bytes;
122        let upload = self.build();
123        EncodeUpload::new(upload, encoder, bytes, part_bytes)
124    }
125}
126
127/// `Upload` manages the lifecycle of an AWS S3 multipart upload.
128///
129/// This type realizes [`MultipartWrite`] through the following:
130///
131/// * `poll_ready` ensures that there is a valid, active multipart upload, and
132///   that the number of pending part upload requests does not exceed configured
133///   capacity.
134/// * `start_send` accepts a [`PartBody`], creates a part upload request future
135///   from it, and pushes it to a collection of these, where the responses are
136///   resolved asynchronously and independently.  Current statistics of the
137///   upload are returned in the value [`UploadStatus`].
138/// * `poll_flush` awaits all pending part upload request futures, draining the
139///   collection.
140/// * `poll_complete` flushes and then submits the request to complete the
141///   multipart upload, returning the response as the value [`Uploaded`].
142///
143/// On completion, a new upload is started if possible.  This relies on the
144/// ability of the [`ObjectUriIter`] that it was created with to generate the
145/// next upload URI.  Polling for readiness ensures that the new upload has
146/// resolved to an upload ID before allowing `start_send`.
147///
148/// # Fusing
149///
150/// If [`ObjectUriIter::next`] ever produces `None`, the multipart upload will
151/// transition to a terminated state and it is not polled again.
152///
153/// [`MultipartWrite`]: multipart_write::MultipartWrite
154#[derive(Debug)]
155#[must_use = "futures do nothing unless polled"]
156#[pin_project::pin_project]
157pub struct Upload {
158    #[pin]
159    inner: MultipartUpload,
160}
161
162impl Upload {
163    fn new<T: Into<Option<usize>>>(
164        client: UploadClient,
165        iter: ObjectUriIter,
166        upload_bytes: ByteSize,
167        capacity: T,
168    ) -> Self {
169        let inner =
170            MultipartUpload::new(client, iter, upload_bytes, capacity.into());
171        Self { inner }
172    }
173}
174
175impl FusedMultipartWrite<PartBody> for Upload {
176    fn is_terminated(&self) -> bool {
177        self.inner.is_terminated()
178    }
179}
180
181impl MultipartWrite<PartBody> for Upload {
182    type Error = Error;
183    type Output = Uploaded;
184    type Recv = UploadStatus;
185
186    fn poll_ready(
187        self: Pin<&mut Self>,
188        cx: &mut Context<'_>,
189    ) -> Poll<Result<(), Self::Error>> {
190        self.project().inner.poll_ready(cx)
191    }
192
193    fn start_send(
194        self: Pin<&mut Self>,
195        part: PartBody,
196    ) -> Result<Self::Recv, Self::Error> {
197        self.project().inner.start_send(part)
198    }
199
200    fn poll_flush(
201        self: Pin<&mut Self>,
202        cx: &mut Context<'_>,
203    ) -> Poll<Result<(), Self::Error>> {
204        self.project().inner.poll_flush(cx)
205    }
206
207    fn poll_complete(
208        self: Pin<&mut Self>,
209        cx: &mut Context<'_>,
210    ) -> Poll<Result<Self::Output, Self::Error>> {
211        self.project().inner.poll_complete(cx)
212    }
213}
214
215/// `EncodeUpload` manages the lifecycle of an AWS S3 multipart upload built
216/// from values `Item` using the value `E` to encode them.
217///
218/// Whereas [`Upload`] can only accept a [`PartBody`] that is ready to be sent,
219/// `EncodeUpload` builds the part from `Item`s first.
220#[derive(Debug)]
221#[must_use = "futures do nothing unless polled"]
222#[pin_project::pin_project]
223pub struct EncodeUpload<Item, E> {
224    #[pin]
225    inner: WithPartEncoder<Upload, Item, E>,
226}
227
228impl<Item, E> EncodeUpload<Item, E> {
229    fn new(
230        upload: Upload,
231        encoder: E,
232        bytes: ByteSize,
233        part_bytes: ByteSize,
234    ) -> EncodeUpload<Item, E> {
235        let inner = WithPartEncoder::new(upload, encoder, bytes, part_bytes);
236        EncodeUpload { inner }
237    }
238}
239
240impl<Item, E: PartEncoder<Item>> FusedMultipartWrite<Item>
241    for EncodeUpload<Item, E>
242{
243    fn is_terminated(&self) -> bool {
244        self.inner.is_terminated()
245    }
246}
247
248impl<Item, E: PartEncoder<Item>> MultipartWrite<Item>
249    for EncodeUpload<Item, E>
250{
251    type Error = Error;
252    type Output = Uploaded;
253    type Recv = EncoderStatus;
254
255    fn poll_ready(
256        self: Pin<&mut Self>,
257        cx: &mut Context<'_>,
258    ) -> Poll<Result<(), Self::Error>> {
259        self.project().inner.poll_ready(cx)
260    }
261
262    fn start_send(
263        self: Pin<&mut Self>,
264        part: Item,
265    ) -> Result<Self::Recv, Self::Error> {
266        self.project().inner.start_send(part)
267    }
268
269    fn poll_flush(
270        self: Pin<&mut Self>,
271        cx: &mut Context<'_>,
272    ) -> Poll<Result<(), Self::Error>> {
273        self.project().inner.poll_flush(cx)
274    }
275
276    fn poll_complete(
277        self: Pin<&mut Self>,
278        cx: &mut Context<'_>,
279    ) -> Poll<Result<Self::Output, Self::Error>> {
280        self.project().inner.poll_complete(cx)
281    }
282}