Skip to main content

aws_multipart_upload/write/
multipart_upload.rs

1use std::fmt::{self, Debug, Formatter};
2use std::pin::Pin;
3use std::task::{Context, Poll};
4use std::time::{Duration, Instant};
5
6use bytesize::ByteSize;
7use futures::ready;
8use multipart_write::{FusedMultipartWrite, MultipartWrite};
9
10use super::part_buffer::PartBuffer;
11use super::{ShouldComplete, Uploaded};
12use crate::client::part::{CompletedParts, PartBody, PartNumber};
13use crate::client::request::*;
14use crate::client::{UploadClient, UploadData};
15use crate::error::{Error, ErrorWithUpload as _};
16use crate::uri::{ObjectUri, ObjectUriIter};
17
18/// Returned by `Upload` when sending a part was successful.
19#[derive(Debug, Clone, Copy)]
20pub struct UploadStatus {
21    /// Last recorded size in bytes of all parts that have been added to the
22    /// upload successfully.
23    pub upload_bytes: u64,
24    /// Total size in bytes of all parts that have been sent.
25    pub total_bytes: u64,
26    /// Total number of parts that have been sent.
27    pub total_parts: u64,
28    /// Total duration of this upload.
29    pub duration: Duration,
30    /// Whether the current upload should be completed.
31    pub should_complete: bool,
32}
33
34impl ShouldComplete for UploadStatus {
35    fn should_complete(&self) -> bool {
36        self.should_complete
37    }
38}
39
40/// `MultipartUpload` is a multipart writer that also manages state transition
41/// of `UploadInner`.
42#[derive(Debug)]
43#[must_use = "futures do nothing unless polled"]
44#[pin_project::pin_project]
45pub(crate) struct MultipartUpload {
46    #[pin]
47    inner: UploadInner,
48    #[pin]
49    fut: Option<SendCompleteUpload>,
50    client: UploadClient,
51    iter: ObjectUriIter,
52    state: GlobalState,
53    config: GlobalConfig,
54}
55
56impl MultipartUpload {
57    pub(crate) fn new(
58        client: UploadClient,
59        mut iter: ObjectUriIter,
60        bytes: ByteSize,
61        capacity: Option<usize>,
62    ) -> Self {
63        let max_bytes = bytes.as_u64();
64        let config = GlobalConfig { max_bytes, capacity };
65        let inner = UploadInner::create_upload_maybe(&client, iter.next());
66        let state = GlobalState::new(config);
67        Self { inner, fut: None, client, iter, state, config }
68    }
69}
70
71impl FusedMultipartWrite<PartBody> for MultipartUpload {
72    fn is_terminated(&self) -> bool {
73        self.inner.is_terminated()
74    }
75}
76
77impl MultipartWrite<PartBody> for MultipartUpload {
78    type Error = Error;
79    type Output = Uploaded;
80    type Recv = UploadStatus;
81
82    fn poll_ready(
83        mut self: Pin<&mut Self>,
84        cx: &mut Context<'_>,
85    ) -> Poll<Result<(), Self::Error>> {
86        let mut this = self.as_mut().project();
87        match this.inner.as_mut().project() {
88            // Can't send parts if there is no upload.
89            UploadProj::Pending(fut) => {
90                let data = ready!(fut.poll(cx))?;
91                trace!(
92                    id = %&data.id,
93                    uri = %&data.uri,
94                    "new upload created",
95                );
96                let new_inner = UploadInner::active(
97                    this.client,
98                    data,
99                    this.config.capacity,
100                );
101                *this.state = GlobalState::new(*this.config);
102                this.inner.set(new_inner);
103                Poll::Ready(Ok(()))
104            },
105            UploadProj::Active(upl) => upl.poll_ready(cx),
106            UploadProj::Terminated => {
107                Poll::Ready(Err(Error::state("polled upload after completion")))
108            },
109        }
110    }
111
112    fn start_send(
113        mut self: Pin<&mut Self>,
114        part: PartBody,
115    ) -> Result<Self::Recv, Self::Error> {
116        let this = self.as_mut().project();
117        let upl =
118            this.inner.get_active_proj().expect("called send with no upload");
119        let recv = upl.start_send(part)?;
120        Ok(this.state.upload_status(recv))
121    }
122
123    fn poll_flush(
124        mut self: Pin<&mut Self>,
125        cx: &mut Context<'_>,
126    ) -> Poll<Result<(), Self::Error>> {
127        let this = self.as_mut().project();
128        // It should not be the case that `poll_flush` is called without an
129        // active upload. We've finished an upload in this case and are
130        // waiting for the next one, so flushing when there hasn't been
131        // anything written doesn't make sense.
132        //
133        // I's not against the law though, and it makes sense to return
134        // immediately if there's nothing to flush, so that's what we
135        // do.
136        let Some(upl) = this.inner.get_active_proj() else {
137            return Poll::Ready(Ok(()));
138        };
139        upl.poll_flush(cx)
140    }
141
142    fn poll_complete(
143        mut self: Pin<&mut Self>,
144        cx: &mut Context<'_>,
145    ) -> Poll<Result<Self::Output, Self::Error>> {
146        let mut this = self.as_mut().project();
147        // First poll drains active requests and sets the CompleteUpload future.
148        if this.fut.is_none() {
149            let upl = this
150                .inner
151                .as_mut()
152                .get_active_proj()
153                .expect("called complete with no upload");
154            let out = ready!(upl.poll_complete(cx))?;
155            trace!(
156                id = %&out.data.id,
157                uri = %&out.data.uri,
158                bytes = out.parts.size(),
159                parts = out.parts.count(),
160                "starting complete upload",
161            );
162            let req = CompleteRequest::new(&out.data, out.parts);
163            let fut = SendCompleteUpload::new(this.client, req);
164            this.fut.set(Some(fut));
165        }
166        let fut = this.fut.as_mut().as_pin_mut().unwrap();
167        let resp = ready!(fut.poll(cx))?;
168        let output = this.state.uploaded(resp);
169        // Now transition `UploadInner` to be `Pending` a new upload.
170        //
171        // If there isn't another URI to get, `Terminated` is the state instead.
172        // Either way this value should have a valid state when the method
173        // returns, just if it's `Terminated` the fuse behavior will mean it's
174        // not polled again.
175        let next = this.iter.next();
176        trace!(
177            uri = %&output.uri,
178            etag = %&output.etag,
179            terminated = next.is_none(),
180            "completed upload",
181        );
182        let new_inner = UploadInner::create_upload_maybe(this.client, next);
183        this.fut.set(None);
184        this.inner.set(new_inner);
185        Poll::Ready(Ok(output))
186    }
187}
188
189#[derive(Debug)]
190#[must_use = "futures do nothing unless polled"]
191#[pin_project::pin_project(project = UploadProj)]
192enum UploadInner {
193    Active(#[pin] UploadParts),
194    Pending(#[pin] SendCreateUpload),
195    Terminated,
196}
197
198impl UploadInner {
199    fn active(
200        client: &UploadClient,
201        data: UploadData,
202        capacity: Option<usize>,
203    ) -> Self {
204        Self::Active(UploadParts::new(client, data, capacity))
205    }
206
207    /// Try to create a new upload, but return `Terminated` if the URI provided
208    /// is `None`.
209    fn create_upload_maybe(
210        client: &UploadClient,
211        uri: Option<ObjectUri>,
212    ) -> Self {
213        let Some(uri) = uri else {
214            return Self::Terminated;
215        };
216        let req = CreateRequest::new(uri);
217        let fut = SendCreateUpload::new(client, req);
218        Self::Pending(fut)
219    }
220
221    fn is_terminated(&self) -> bool {
222        matches!(self, Self::Terminated)
223    }
224
225    fn get_active_proj(self: Pin<&mut Self>) -> Option<Pin<&mut UploadParts>> {
226        let UploadProj::Active(upl) = self.project() else {
227            return None;
228        };
229        Some(upl)
230    }
231}
232
233/// The main state of `Upload` manages polling the buffer of request futures.
234#[must_use = "futures do nothing unless polled"]
235#[pin_project::pin_project]
236struct UploadParts {
237    #[pin]
238    buf: PartBuffer,
239    client: UploadClient,
240    data: UploadData,
241    parts: CompletedParts,
242    current: PartNumber,
243    is_terminated: bool,
244}
245
246impl UploadParts {
247    fn new(
248        client: &UploadClient,
249        data: UploadData,
250        capacity: Option<usize>,
251    ) -> Self {
252        Self {
253            buf: PartBuffer::new(capacity),
254            client: client.clone(),
255            data,
256            parts: CompletedParts::default(),
257            current: PartNumber::new(),
258            is_terminated: false,
259        }
260    }
261}
262
263impl FusedMultipartWrite<PartBody> for UploadParts {
264    fn is_terminated(&self) -> bool {
265        self.buf.is_terminated() || self.is_terminated
266    }
267}
268
269impl MultipartWrite<PartBody> for UploadParts {
270    type Error = Error;
271    type Output = PartBufOutput;
272    type Recv = PartBufRecv;
273
274    fn poll_ready(
275        self: Pin<&mut Self>,
276        cx: &mut Context<'_>,
277    ) -> Poll<Result<(), Self::Error>> {
278        let this = self.project();
279        ready!(this.buf.poll_ready(cx)).err_with_upl(
280            &this.data.id,
281            &this.data.uri,
282            this.parts,
283        )?;
284        Poll::Ready(Ok(()))
285    }
286
287    fn start_send(
288        self: Pin<&mut Self>,
289        body: PartBody,
290    ) -> Result<Self::Recv, Self::Error> {
291        let mut this = self.project();
292        // Increments the part number while returning the current value.
293        let ptnum = this.current.fetch_incr();
294        let ptb = body.size();
295        let req = UploadPartRequest::new(this.data, body, ptnum);
296        let fut = SendUploadPart::new(this.client, req);
297        let upb = this.buf.as_mut().start_send(fut)?;
298        trace!(
299            id = %&this.data.id,
300            uri = %&this.data.uri,
301            part_number = %ptnum,
302            part_bytes = ptb,
303            upload_bytes = upb,
304            "sent part upload",
305        );
306        Ok(PartBufRecv::new(upb, ptb))
307    }
308
309    fn poll_flush(
310        self: Pin<&mut Self>,
311        cx: &mut Context<'_>,
312    ) -> Poll<Result<(), Self::Error>> {
313        let this = self.project();
314        let completed = ready!(this.buf.poll_complete(cx)).err_with_upl(
315            &this.data.id,
316            &this.data.uri,
317            this.parts,
318        )?;
319        this.parts.append(completed);
320        Poll::Ready(Ok(()))
321    }
322
323    fn poll_complete(
324        self: Pin<&mut Self>,
325        cx: &mut Context<'_>,
326    ) -> Poll<Result<Self::Output, Self::Error>> {
327        let this = self.project();
328        let completed = ready!(this.buf.poll_complete(cx)).err_with_upl(
329            &this.data.id,
330            &this.data.uri,
331            this.parts,
332        )?;
333        let mut parts = std::mem::take(this.parts);
334        parts.append(completed);
335        let data = this.data.clone();
336        trace!(
337            id = %&data.id,
338            uri = %&data.uri,
339            bytes = parts.size(),
340            parts = parts.count(),
341            "finished uploading parts",
342        );
343        // To ensure this isn't accidentally polled again.
344        *this.is_terminated = true;
345        Poll::Ready(Ok(PartBufOutput { parts, data }))
346    }
347}
348
349impl Debug for UploadParts {
350    fn fmt(&self, f: &mut Formatter<'_>) -> fmt::Result {
351        f.debug_struct("UploadParts")
352            .field("buf", &self.buf)
353            .field("client", &self.client)
354            .field("data", &self.data)
355            .field("parts", &self.parts)
356            .field("current", &self.current)
357            .field("is_terminated", &self.is_terminated)
358            .finish()
359    }
360}
361
362/// User-defined constraints on the overall upload.
363#[derive(Debug, Clone, Copy)]
364struct GlobalConfig {
365    max_bytes: u64,
366    // Capacity for the future buffer; limit to concurrency.
367    capacity: Option<usize>,
368}
369
370/// To track overall upload progress.
371#[derive(Debug, Clone, Copy)]
372struct GlobalState {
373    total_bytes: u64,
374    total_parts: u64,
375    start: Instant,
376    config: GlobalConfig,
377}
378
379impl GlobalState {
380    fn new(config: GlobalConfig) -> Self {
381        Self { total_bytes: 0, total_parts: 0, start: Instant::now(), config }
382    }
383
384    fn should_complete(&self) -> bool {
385        self.total_bytes >= self.config.max_bytes
386    }
387
388    fn upload_status(&mut self, recv: PartBufRecv) -> UploadStatus {
389        let part_bytes = recv.part_bytes as u64;
390        self.total_bytes += part_bytes;
391        self.total_parts += 1;
392        UploadStatus {
393            total_bytes: self.total_bytes,
394            total_parts: self.total_parts,
395            upload_bytes: recv.upload_bytes as u64,
396            duration: self.start.elapsed(),
397            should_complete: self.should_complete(),
398        }
399    }
400
401    fn uploaded(&self, resp: CompletedUpload) -> Uploaded {
402        Uploaded {
403            uri: resp.uri,
404            etag: resp.etag,
405            bytes: self.total_bytes,
406            parts: self.total_parts,
407            items: None,
408            duration: self.start.elapsed(),
409        }
410    }
411}
412
413#[derive(Debug, Clone, Copy)]
414struct PartBufRecv {
415    upload_bytes: usize,
416    part_bytes: usize,
417}
418
419impl PartBufRecv {
420    fn new(upload_bytes: usize, part_bytes: usize) -> Self {
421        Self { upload_bytes, part_bytes }
422    }
423}
424
425#[derive(Debug, Clone)]
426struct PartBufOutput {
427    parts: CompletedParts,
428    data: UploadData,
429}